diff --git a/dnastack/cli/commands/explorer/questions/commands.py b/dnastack/cli/commands/explorer/questions/commands.py index eb2317a5..9e79b0fe 100644 --- a/dnastack/cli/commands/explorer/questions/commands.py +++ b/dnastack/cli/commands/explorer/questions/commands.py @@ -102,6 +102,13 @@ def describe_question(question_id: str, output: str, context: Optional[str], end arg_names=['--output-file'], help='Output file path for results' ), + ArgumentSpec( + name='local_federated', + arg_names=['--local-federated'], + help='Query collections directly via local federation instead of using server-side federation', + type=bool, + default=False + ), DATA_OUTPUT_ARG, CONTEXT_ARG, SINGLE_ENDPOINT_ID_ARG, @@ -112,6 +119,7 @@ def ask_question( args: tuple, collections: Optional[JsonLike], output_file: Optional[str], + local_federated: bool, output: str, context: Optional[str], endpoint_id: Optional[str] @@ -119,6 +127,7 @@ def ask_question( """Ask a federated question with the provided parameters""" trace = Span() client = get_explorer_client(context=context, endpoint_id=endpoint_id, trace=trace) + # Parse collections if provided if collections: @@ -162,12 +171,20 @@ def ask_question( collection_ids = [col.id for col in question.collections] # Execute the question - results_iter = client.ask_federated_question( - question_id=question_name, - inputs=inputs, - collections=collection_ids, - trace=trace - ) + if local_federated: + results_iter = client.ask_question_local_federated( + federated_question_id=question_name, + inputs=inputs, + collections=collection_ids, + trace=trace + ) + else: + results_iter = client.ask_federated_question( + question_id=question_name, + inputs=inputs, + collections=collection_ids, + trace=trace + ) # Collect results results = list(results_iter) diff --git a/dnastack/client/explorer/client.py b/dnastack/client/explorer/client.py index cf90fe24..a83bade0 100644 --- a/dnastack/client/explorer/client.py +++ b/dnastack/client/explorer/client.py @@ -1,4 +1,6 @@ from typing import List, Optional, Dict, Any, TYPE_CHECKING +from concurrent.futures import ThreadPoolExecutor, as_completed +import time if TYPE_CHECKING: from dnastack.client.explorer.models import FederatedQuestion @@ -10,7 +12,8 @@ from dnastack.client.explorer.models import ( FederatedQuestion, FederatedQuestionListResponse, - FederatedQuestionQueryRequest + FederatedQuestionQueryRequest, + QuestionCollection ) from dnastack.client.result_iterator import ResultLoader, InactiveLoaderError, ResultIterator from dnastack.client.service_registry.models import ServiceType @@ -136,6 +139,55 @@ def ask_federated_question( ) ) + def ask_question_local_federated( + self, + federated_question_id: str, + inputs: Dict[str, str], + collections: Optional[List[str]] = None, + trace: Optional[Span] = None + ) -> 'ResultIterator[Dict[str, Any]]': + """ + Query collections directly via local federation instead of server-side federation. + + Args: + federated_question_id: The ID of the federated question to ask + inputs: Dictionary of parameter name -> value mappings + collections: Optional list of collection IDs to query. If None, all collections are used. + trace: Optional tracing span + + Returns: + ResultIterator[Dict[str, Any]]: Iterator over aggregated query results in federated format + """ + # Get federated question metadata to obtain per-collection question IDs + question = self.describe_federated_question(federated_question_id, trace=trace) + + # Filter collections if specified + if collections is not None: + # Create a map of collection ID to QuestionCollection for filtering + collection_map = {col.id: col for col in question.collections} + target_collections = [collection_map[cid] for cid in collections if cid in collection_map] + + # Check for invalid collection IDs + invalid_ids = [cid for cid in collections if cid not in collection_map] + if invalid_ids: + raise ClientError( + response=None, + trace_context=trace, + message=f"Invalid collection IDs for question '{federated_question_id}': {', '.join(invalid_ids)}" + ) + else: + target_collections = question.collections + + # Create the result loader for local federation + return ResultIterator( + LocalFederatedQuestionQueryResultLoader( + explorer_client=self, + collections=target_collections, + inputs=inputs, + trace=trace + ) + ) + class FederatedQuestionListResultLoader(ResultLoader): """ @@ -248,4 +300,188 @@ def load(self) -> List[Dict[str, Any]]: raise ClientError(e.response, e.trace, "Invalid question parameters") else: - raise ClientError(e.response, e.trace, "Failed to execute federated question") \ No newline at end of file + raise ClientError(e.response, e.trace, "Failed to execute federated question") + + +class LocalFederatedQuestionQueryResultLoader(ResultLoader): + """ + Result loader for local federation queries that queries each collection directly. + """ + + def __init__( + self, + explorer_client: 'ExplorerClient', + collections: List[QuestionCollection], + inputs: Dict[str, str], + trace: Optional[Span] = None + ): + self.__explorer_client = explorer_client + self.__collections = collections + self.__inputs = inputs + self.__trace = trace + self.__loaded = False + + def has_more(self) -> bool: + return not self.__loaded + + def load(self) -> List[Dict[str, Any]]: + if self.__loaded: + raise InactiveLoaderError("LocalFederatedQuestionQueryResultLoader") + + # Execute parallel queries to each collection + with ThreadPoolExecutor() as executor: + # Submit all queries + future_to_collection = { + executor.submit( + self._query_single_collection, + collection + ): collection + for collection in self.__collections + } + + # Collect results + results = [] + for future in as_completed(future_to_collection): + result = future.result() + results.append(result) + + # Return results directly as a list to match federated format + self.__loaded = True + return results # Return as list to match federated endpoint format + + def _query_single_collection(self, collection: QuestionCollection) -> Dict[str, Any]: + """ + Query a single collection and return the result in federated format. + Handles Data Connect pagination by following next_page_url links. + """ + start_time = time.time() + + # Build the collection-specific endpoint URL + # Note: explorer URL already ends with /api/, so we don't need to add it again + initial_url = urljoin( + self.__explorer_client.url, + f"collections/{collection.slug}/questions/{collection.question_id}/query" + ) + + try: + # Collect all data across all pages + all_data = [] + data_model = None + current_url = None + visited_urls = [] + + with self.__explorer_client._session as session: + # First request - POST with params to initiate query + response = session.post( + initial_url, + json={"params": self.__inputs}, + trace_context=self.__trace + ) + visited_urls.append(initial_url) + + while True: + # Parse the Data Connect response + table_data = response.json() + + # Capture data model from first response + if data_model is None and 'data_model' in table_data: + data_model = table_data['data_model'] + + # Add data from this page + if 'data' in table_data and isinstance(table_data['data'], list): + # Add collection_name to each item + for item in table_data['data']: + item['collection_name'] = collection.name + all_data.extend(table_data['data']) + + # Check for next page + pagination = table_data.get('pagination') + if pagination and pagination.get('next_page_url'): + current_url = pagination['next_page_url'] + # Handle relative URLs + if current_url and not current_url.startswith(('http://', 'https://')): + current_url = urljoin(visited_urls[-1], current_url) + + # Prevent infinite loops + if current_url in visited_urls: + break + + # Follow pagination with GET request + response = session.get( + current_url, + trace_context=self.__trace + ) + visited_urls.append(current_url) + else: + # No more pages + break + + # Build final aggregated response + aggregated_table_data = { + "data": all_data, + "data_model": data_model, + "pagination": None # No pagination in aggregated result + } + + # Return in federated format + return { + "collectionId": collection.id, + "collectionSlug": collection.slug, + "results": aggregated_table_data, + "error": None, + "failureInfo": None + } + + except HttpError as e: + # Calculate response time + response_time_ms = int((time.time() - start_time) * 1000) + + # Determine failure reason + status_code = e.response.status_code if e.response else None + if status_code == 401: + reason = "UNAUTHORIZED" + message = f"Authentication required for collection {collection.name}" + elif status_code == 403: + reason = "FORBIDDEN" + message = f"Access denied to collection {collection.name}" + elif status_code == 404: + reason = "NOT_FOUND" + message = f"Question not found in collection {collection.name}" + elif status_code == 400: + reason = "BAD_REQUEST" + message = f"Invalid parameters for collection {collection.name}" + elif status_code and status_code >= 500: + reason = "SERVER_ERROR" + message = f"Server error for collection {collection.name}" + else: + reason = "UNKNOWN" + message = str(e) + + # Return error in federated format + return { + "collectionId": collection.id, + "collectionSlug": collection.slug, + "results": None, + "error": message, + "failureInfo": { + "reason": reason, + "message": message, + "responseTimeMs": response_time_ms + } + } + + except Exception as e: + # Handle non-HTTP errors + response_time_ms = int((time.time() - start_time) * 1000) + + return { + "collectionId": collection.id, + "collectionSlug": collection.slug, + "results": None, + "error": str(e), + "failureInfo": { + "reason": "CLIENT_ERROR", + "message": str(e), + "responseTimeMs": response_time_ms + } + } \ No newline at end of file diff --git a/tests/test_explorer_cli_local_federation.py b/tests/test_explorer_cli_local_federation.py new file mode 100644 index 00000000..e5fcdff0 --- /dev/null +++ b/tests/test_explorer_cli_local_federation.py @@ -0,0 +1,415 @@ +import pytest +import tempfile +import os +from unittest.mock import MagicMock, patch +from assertpy import assert_that + +from dnastack.cli.core.command_spec import ArgumentSpec + + +class TestExplorerQuestionsLocalFederationCLI: + """Test cases for CLI integration with --local-federated flag""" + + def test_should_have_local_federated_argument_spec_defined(self): + """Test that --local-federated ArgumentSpec is properly defined""" + # Verify the ArgumentSpec exists by checking in the source code structure + # Since the command is created dynamically, we test the spec configuration + spec = ArgumentSpec( + name='local_federated', + arg_names=['--local-federated'], + help='Query collections directly via local federation instead of using server-side federation', + type=bool, + default=False + ) + + # Test the spec properties + assert_that(spec.name).is_equal_to('local_federated') + assert_that(spec.arg_names).contains('--local-federated') + assert_that(spec.type).is_equal_to(bool) + assert_that(spec.default).is_false() + + @patch('dnastack.cli.commands.explorer.questions.commands.get_explorer_client') + @patch('dnastack.cli.commands.explorer.questions.commands.handle_question_results_output') + def test_should_execute_local_federation_when_flag_provided(self, mock_output_handler, mock_get_client): + """Test the ask_question command logic with local_federated=True""" + mock_client = MagicMock() + mock_result_iterator = MagicMock() + mock_result_iterator.__iter__ = MagicMock(return_value=iter([])) + mock_client.ask_question_local_federated.return_value = mock_result_iterator + + # Mock describe_federated_question for parameter validation + mock_question = MagicMock() + mock_question.params = [] + mock_question.collections = [ + MagicMock(id='c1', name='Collection 1'), + MagicMock(id='c2', name='Collection 2') + ] + mock_client.describe_federated_question.return_value = mock_question + mock_get_client.return_value = mock_client + + # Simulate the command function call directly + # This simulates what the CLI framework would do + from dnastack.common.json_argument_parser import JsonLike + + # Simulate calling the ask_question function with local_federated=True + question_name = 'test-question' + collections = JsonLike('c1,c2') + local_federated = True + + # Import and call the inner function logic by simulating it + # Since we can't easily call the nested function, we test the client methods directly + from dnastack.cli.commands.explorer.questions.utils import parse_collections_argument + from dnastack.common.tracing import Span + + trace = Span() + client = mock_get_client(context=None, endpoint_id=None, trace=trace) + + # Parse collections + collections_str = collections.value() + collection_ids = parse_collections_argument(collections_str) + + # Get question for validation + client.describe_federated_question(question_name, trace=trace) + + # Execute based on flag + if local_federated: + client.ask_question_local_federated( + federated_question_id=question_name, + inputs={}, + collections=collection_ids, + trace=trace + ) + + # Verify ask_question_local_federated was called + mock_client.ask_question_local_federated.assert_called_once_with( + federated_question_id=question_name, + inputs={}, + collections=['c1', 'c2'], + trace=trace + ) + + @patch('dnastack.cli.commands.explorer.questions.commands.get_explorer_client') + @patch('dnastack.cli.commands.explorer.questions.commands.handle_question_results_output') + def test_should_execute_normal_federation_when_flag_not_provided(self, mock_output_handler, mock_get_client): + """Test the ask_question command logic with local_federated=False""" + mock_client = MagicMock() + mock_result_iterator = MagicMock() + mock_result_iterator.__iter__ = MagicMock(return_value=iter([])) + mock_client.ask_federated_question.return_value = mock_result_iterator + + # Mock describe_federated_question for parameter validation + mock_question = MagicMock() + mock_question.params = [] + mock_question.collections = [ + MagicMock(id='c1', name='Collection 1'), + MagicMock(id='c2', name='Collection 2') + ] + mock_client.describe_federated_question.return_value = mock_question + mock_get_client.return_value = mock_client + + # Test the command logic with local_federated=False + from dnastack.common.json_argument_parser import JsonLike + from dnastack.cli.commands.explorer.questions.utils import parse_collections_argument + from dnastack.common.tracing import Span + + question_name = 'test-question' + collections = JsonLike('c1,c2') + local_federated = False + + trace = Span() + client = mock_get_client(context=None, endpoint_id=None, trace=trace) + + # Parse collections + collections_str = collections.value() + collection_ids = parse_collections_argument(collections_str) + + # Get question for validation + client.describe_federated_question(question_name, trace=trace) + + # Execute based on flag + if not local_federated: + client.ask_federated_question( + federated_question_id=question_name, + inputs={}, + collections=collection_ids, + trace=trace + ) + + # Verify ask_federated_question was called + mock_client.ask_federated_question.assert_called_once_with( + federated_question_id=question_name, + inputs={}, + collections=['c1', 'c2'], + trace=trace + ) + + @patch('dnastack.cli.commands.explorer.questions.commands.get_explorer_client') + def test_should_validate_collections_parameter_for_local_federation(self, mock_get_client): + """Test that collections parameter parsing works for local federation""" + from dnastack.cli.commands.explorer.questions.utils import parse_collections_argument + from dnastack.common.json_argument_parser import JsonLike + + # Test comma-separated collections parsing + collections_input = JsonLike('c1,c2,c3') + collections_str = collections_input.value() + collection_ids = parse_collections_argument(collections_str) + + assert_that(collection_ids).is_equal_to(['c1', 'c2', 'c3']) + + # Test newline-separated collections parsing + collections_input = JsonLike('c1\nc2\nc3') + collections_str = collections_input.value() + collection_ids = parse_collections_argument(collections_str) + + assert_that(collection_ids).is_equal_to(['c1', 'c2', 'c3']) + + def test_should_handle_parameter_file_loading_with_json_like(self): + """Test @ prefix file loading works with JsonLike""" + from dnastack.common.json_argument_parser import JsonLike + + # Create a temporary file with parameter data + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: + f.write('chr1,chr2,chr3') + temp_file = f.name + + try: + # Test file loading with @ prefix + param_value = JsonLike(f'@{temp_file}') + loaded_content = param_value.value() + + assert_that(loaded_content).is_equal_to('chr1,chr2,chr3') + + finally: + os.unlink(temp_file) + + def test_should_maintain_argument_parsing_compatibility(self): + """Test that argument parsing works with the expected formats""" + + # Test parameter parsing in the format the CLI uses + args_tuple = (('param1', 'value1'), ('param2', 'value2')) + + # Convert to the format expected by parse_and_merge_arguments + args_dict = {} + for key, value in args_tuple: + args_dict[key] = value + + assert_that(args_dict).is_equal_to({'param1': 'value1', 'param2': 'value2'}) + + +class TestLocalFederationErrorHandling: + """Test cases for error handling and edge cases in local federation""" + + def test_should_handle_empty_collections_list_gracefully(self): + """Test behavior when no collections are provided""" + from dnastack.cli.commands.explorer.questions.utils import parse_collections_argument + + # Test with None + result = parse_collections_argument(None) + assert_that(result).is_none() + + # Test with empty string + result = parse_collections_argument("") + assert_that(result).is_none() + + # Test with whitespace only + result = parse_collections_argument(" ") + assert_that(result).is_equal_to([]) + + def test_should_handle_invalid_collection_ids_in_local_federation(self): + """Test error handling for non-existent collection IDs""" + from dnastack.http.session import ClientError + from dnastack.client.explorer.client import ExplorerClient + + mock_client = MagicMock(spec=ExplorerClient) + + # Create a proper ClientError with a mock response + mock_response = MagicMock() + mock_response.status_code = 404 + mock_response.text = "Collection 'invalid_id' not found" + client_error = ClientError(mock_response) + + mock_client.ask_question_local_federated.side_effect = client_error + + # Test that the client error is propagated correctly + with pytest.raises(ClientError): + mock_client.ask_question_local_federated( + federated_question_id='test-question', + inputs={}, + collections=['c1', 'invalid_id'] + ) + + def test_should_handle_authentication_failures_per_collection(self): + """Test partial authentication failures result format""" + # Test that the result format can handle mixed success/failure + mixed_results = [ + { + 'collectionId': 'c1', + 'collectionSlug': 'collection-1', + 'results': {'data': [{'result': 'success'}]}, + 'error': None, + 'failureInfo': None + }, + { + 'collectionId': 'c2', + 'collectionSlug': 'collection-2', + 'results': None, + 'error': '401: Unauthorized', + 'failureInfo': {'status': 401} + } + ] + + # Verify the format is valid + for result in mixed_results: + assert_that(result).contains_key('collectionId') + assert_that(result).contains_key('collectionSlug') + assert_that(result).contains_key('error') + assert_that(result).contains_key('failureInfo') + + # Check success case + success_result = mixed_results[0] + assert_that(success_result['error']).is_none() + assert_that(success_result['results']['data']).is_not_empty() + + # Check failure case + failure_result = mixed_results[1] + assert_that(failure_result['error']).is_not_none() + assert_that(failure_result['results']).is_none() + + def test_should_handle_timeout_errors_during_local_federation(self): + """Test timeout handling for slow collection responses""" + import requests + from dnastack.client.explorer.client import ExplorerClient + + mock_client = MagicMock(spec=ExplorerClient) + mock_client.ask_question_local_federated.side_effect = requests.exceptions.Timeout("Request timed out") + + # Should propagate the timeout error + with pytest.raises(requests.exceptions.Timeout, match="Request timed out"): + mock_client.ask_question_local_federated( + federated_question_id='test-question', + inputs={}, + collections=['c1'] + ) + + +class TestLocalFederationPerformance: + """Test cases for performance and concurrency aspects""" + + def test_should_handle_large_parameter_sets_efficiently(self): + """Test performance with large parameter strings""" + # Create a large parameter string (simulating large file content) + large_param_value = ','.join([f'chr{i}' for i in range(1, 501)]) # 500 entries + + # Test that the string handling works with large values + inputs = {'chromosome': large_param_value} + + assert_that(inputs).contains_key('chromosome') + assert_that(inputs['chromosome']).is_equal_to(large_param_value) + assert_that(len(inputs['chromosome'].split(','))).is_equal_to(500) + + def test_should_work_with_existing_parameter_validation(self): + """Test that parameter validation works with the expected structures""" + from dnastack.cli.commands.explorer.questions.utils import validate_question_parameters + + # Create proper mock parameters + mock_required_param = MagicMock() + mock_required_param.name = 'required_param' + mock_required_param.required = True + + mock_optional_param = MagicMock() + mock_optional_param.name = 'optional_param' + mock_optional_param.required = False + + # Mock question with parameters + mock_question = MagicMock() + mock_question.params = [mock_required_param, mock_optional_param] + + # Test with valid parameters + inputs = {'required_param': 'value1', 'optional_param': 'value2'} + + # Should not raise an exception + validated_inputs = validate_question_parameters(inputs, mock_question) + assert_that(validated_inputs).is_equal_to(inputs) + + # Test with missing required parameter should raise + invalid_inputs = {'optional_param': 'value2'} + with pytest.raises(ValueError, match="Missing required parameters"): + validate_question_parameters(invalid_inputs, mock_question) + + +class TestLocalFederationIntegration: + """Test cases for integration with existing systems""" + + def test_should_work_with_existing_collection_parsing_utilities(self): + """Test integration with parse_collections_argument utility""" + from dnastack.cli.commands.explorer.questions.utils import parse_collections_argument + + # Test various collection formats that parse_collections_argument supports + test_cases = [ + # Comma-separated + ('c1,c2,c3', ['c1', 'c2', 'c3']), + # With spaces + ('c1, c2 , c3', ['c1', 'c2', 'c3']), + # Single collection + ('c1', ['c1']), + # Newline-separated + ('c1\nc2\nc3', ['c1', 'c2', 'c3']), + # Realistic collection IDs + ('7VnJ-b6bb34b6-dc1b-4ede-9aee-627e64f878c5,Lu0K-cd1cdf5a-1cb0-4b47-bf52-d365f928a1b4', + ['7VnJ-b6bb34b6-dc1b-4ede-9aee-627e64f878c5', 'Lu0K-cd1cdf5a-1cb0-4b47-bf52-d365f928a1b4']) + ] + + for collections_input, expected_parsed in test_cases: + # Verify parse_collections_argument works correctly + parsed = parse_collections_argument(collections_input) + assert_that(parsed).is_equal_to(expected_parsed) + + def test_should_maintain_compatibility_with_existing_result_formats(self): + """Test that result format is compatible with existing utilities""" + from dnastack.cli.commands.explorer.questions.utils import flatten_result_for_export + + # Test result format that should be compatible with existing utilities + compatible_result = { + 'collectionId': 'c1', + 'collectionSlug': 'collection-1', + 'results': { + 'data': [ + {'chromosome': 'chr1', 'position': 12345, 'result': 'value1'}, + {'chromosome': 'chr2', 'position': 67890, 'result': 'value2'} + ] + }, + 'error': None, + 'failureInfo': None + } + + # Test that flatten_result_for_export works with the format + flattened = flatten_result_for_export(compatible_result) + assert_that(flattened).contains_key('collectionId') + assert_that(flattened).contains_key('collectionSlug') + + # Test that result data can be processed + result_data = compatible_result['results']['data'] + for data_item in result_data: + flattened_item = flatten_result_for_export(data_item) + assert_that(flattened_item).contains_key('chromosome') + assert_that(flattened_item).contains_key('position') + + def test_should_handle_json_like_parameter_processing(self): + """Test JsonLike parameter processing compatibility""" + from dnastack.common.json_argument_parser import JsonLike + + # Test different JsonLike input formats + test_cases = [ + # Simple string + JsonLike('test_value'), + # Comma-separated + JsonLike('value1,value2,value3'), + # JSON object + JsonLike('{"key": "value"}') + ] + + for json_like in test_cases: + # Should be able to get value without errors + value = json_like.value() + assert_that(value).is_not_none() + assert_that(isinstance(value, str)).is_true() \ No newline at end of file diff --git a/tests/test_explorer_client.py b/tests/test_explorer_client.py index a2d8bcd6..4e68d240 100644 --- a/tests/test_explorer_client.py +++ b/tests/test_explorer_client.py @@ -1246,4 +1246,560 @@ def test_should_handle_file_with_mixed_whitespace(self): result = parse_collections_argument(content) assert_that(result).is_equal_to(["collection1", "collection2", "collection3"]) finally: - os.unlink(temp_file) \ No newline at end of file + os.unlink(temp_file) + + +class TestLocalFederatedQuestionQueryResultLoader: + """Test cases for LocalFederatedQuestionQueryResultLoader""" + + def test_should_initialize_loader_with_required_parameters(self, monkeypatch): + """Test LocalFederatedQuestionQueryResultLoader initialization""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + + mock_explorer_client = MagicMock(spec=ExplorerClient) + collections = [ + QuestionCollection(id="c1", name="Collection 1", slug="collection-1", question_id="q1"), + QuestionCollection(id="c2", name="Collection 2", slug="collection-2", question_id="q2") + ] + inputs = {"param1": "value1"} + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=collections, + inputs=inputs + ) + + assert_that(loader._LocalFederatedQuestionQueryResultLoader__explorer_client).is_equal_to(mock_explorer_client) + assert_that(loader._LocalFederatedQuestionQueryResultLoader__collections).is_equal_to(collections) + assert_that(loader._LocalFederatedQuestionQueryResultLoader__inputs).is_equal_to(inputs) + + def test_should_query_single_collection_with_pagination(self, monkeypatch): + """Test _query_single_collection handles Data Connect pagination correctly""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + + # Mock the ExplorerClient and its session + mock_explorer_client = MagicMock(spec=ExplorerClient) + mock_explorer_client.url = "https://example.com/" # Add URL property + mock_session = MagicMock() + mock_context = MagicMock() + mock_session.__enter__.return_value = mock_context + mock_session.__exit__.return_value = None + mock_explorer_client._session = mock_session + + # Mock pagination responses: empty page -> empty page -> data page -> final page + mock_responses = [ + # First call: empty page with next_page_url + MagicMock(json=lambda: { + "data": [], + "pagination": {"next_page_url": "https://example.com/page2"} + }), + # Second call: empty page with next_page_url + MagicMock(json=lambda: { + "data": [], + "pagination": {"next_page_url": "https://example.com/page3"} + }), + # Third call: data page with next_page_url + MagicMock(json=lambda: { + "data": [{"result1": "data1"}, {"result2": "data2"}], + "pagination": {"next_page_url": "https://example.com/page4"} + }), + # Fourth call: final data page without next_page_url + MagicMock(json=lambda: { + "data": [{"result3": "data3"}], + "pagination": {} + }) + ] + + mock_context.post.side_effect = mock_responses + mock_context.get.side_effect = mock_responses[1:] # For pagination requests + + collection = QuestionCollection( + id="c1", + name="Collection 1", + slug="collection-1", + question_id="q1" + ) + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=[collection], + inputs={"param1": "value1"} + ) + + result = loader._query_single_collection(collection) + + # Should aggregate all data from paginated responses + expected_data = [ + {"result1": "data1", "collection_name": "Collection 1"}, + {"result2": "data2", "collection_name": "Collection 1"}, + {"result3": "data3", "collection_name": "Collection 1"} + ] + + assert_that(result["collectionId"]).is_equal_to("c1") + assert_that(result["collectionSlug"]).is_equal_to("collection-1") + assert_that(result["results"]["data"]).is_equal_to(expected_data) + assert_that(result["error"]).is_none() + assert_that(result["failureInfo"]).is_none() + + def test_should_handle_collection_with_no_pagination(self, monkeypatch): + """Test _query_single_collection with single page response""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + + # Mock the ExplorerClient and its session + mock_explorer_client = MagicMock(spec=ExplorerClient) + mock_explorer_client.url = "https://example.com/" # Add URL property + mock_session = MagicMock() + mock_context = MagicMock() + mock_session.__enter__.return_value = mock_context + mock_session.__exit__.return_value = None + mock_explorer_client._session = mock_session + + # Single response with data, no pagination + mock_response = MagicMock() + mock_response.json.return_value = { + "data": [{"result1": "data1"}, {"result2": "data2"}], + "pagination": {} + } + mock_context.post.return_value = mock_response + + collection = QuestionCollection( + id="c1", + name="Collection 1", + slug="collection-1", + question_id="q1" + ) + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=[collection], + inputs={"param1": "value1"} + ) + + result = loader._query_single_collection(collection) + + assert_that(result["collectionId"]).is_equal_to("c1") + assert_that(result["results"]["data"]).is_equal_to([ + {"result1": "data1", "collection_name": "Collection 1"}, + {"result2": "data2", "collection_name": "Collection 1"} + ]) + + # Should only make one POST request + assert_that(mock_context.post.call_count).is_equal_to(1) + assert_that(mock_context.get.call_count).is_equal_to(0) + + def test_should_use_correct_request_format_for_collection_endpoint(self, monkeypatch): + """Test that collection requests use 'params' not 'inputs'""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + + # Mock the ExplorerClient and its session + mock_explorer_client = MagicMock(spec=ExplorerClient) + mock_explorer_client.url = "https://example.com/" # Add URL property + mock_session = MagicMock() + mock_context = MagicMock() + mock_session.__enter__.return_value = mock_context + mock_session.__exit__.return_value = None + mock_explorer_client._session = mock_session + + mock_response = MagicMock() + mock_response.json.return_value = {"data": [], "pagination": {}} + mock_context.post.return_value = mock_response + + collection = QuestionCollection( + id="c1", + name="Collection 1", + slug="collection-1", + question_id="q1" + ) + + inputs = {"chromosome": "chr1", "position": "12345"} + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=[collection], + inputs=inputs + ) + + loader._query_single_collection(collection) + + # Verify the POST request was made with correct format + mock_context.post.assert_called_once() + call_args = mock_context.post.call_args + + # Should use 'params' in request body, not 'inputs' + assert_that(call_args[1]).contains_key("json") + assert_that(call_args[1]["json"]).contains_key("params") + assert_that(call_args[1]["json"]["params"]).is_equal_to(inputs) + assert_that(call_args[1]["json"]).does_not_contain_key("inputs") + + def test_should_handle_http_errors_during_collection_query(self, monkeypatch): + """Test error handling for HTTP errors during collection queries""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + from dnastack.http.session import HttpError + + # Mock the ExplorerClient and its session + mock_explorer_client = MagicMock(spec=ExplorerClient) + mock_explorer_client.url = "https://example.com/" # Add URL property + mock_session = MagicMock() + mock_context = MagicMock() + mock_session.__enter__.return_value = mock_context + mock_session.__exit__.return_value = None + mock_explorer_client._session = mock_session + + # Mock HTTP error response + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + http_error = HttpError(mock_response) + + mock_context.post.side_effect = http_error + + collection = QuestionCollection( + id="c1", + name="Collection 1", + slug="collection-1", + question_id="q1" + ) + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=[collection], + inputs={"param1": "value1"} + ) + + result = loader._query_single_collection(collection) + + # Should return error result format + assert_that(result["collectionId"]).is_equal_to("c1") + assert_that(result["collectionSlug"]).is_equal_to("collection-1") + assert_that(result["error"]).is_not_none() + assert_that(result["failureInfo"]).is_not_none() + assert_that(result["results"]).is_none() + + def test_should_handle_authentication_errors_during_collection_query(self, monkeypatch): + """Test authentication error handling for individual collections""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + from dnastack.http.session import HttpError + + # Mock the ExplorerClient and its session + mock_explorer_client = MagicMock(spec=ExplorerClient) + mock_explorer_client.url = "https://example.com/" # Add URL property + mock_session = MagicMock() + mock_context = MagicMock() + mock_session.__enter__.return_value = mock_context + mock_session.__exit__.return_value = None + mock_explorer_client._session = mock_session + + # Mock 401 error response + mock_response = MagicMock() + mock_response.status_code = 401 + mock_response.text = "Unauthorized" + http_error = HttpError(mock_response) + + mock_context.post.side_effect = http_error + + collection = QuestionCollection( + id="c1", + name="Collection 1", + slug="collection-1", + question_id="q1" + ) + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=[collection], + inputs={"param1": "value1"} + ) + + result = loader._query_single_collection(collection) + + # Should return error result with authentication failure info + assert_that(result["collectionId"]).is_equal_to("c1") + assert_that(result["error"]).is_not_none() + assert_that(result["error"]).contains("Authentication required") + + def test_should_load_results_from_multiple_collections_in_parallel(self, monkeypatch): + """Test load method executes multiple collections concurrently""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + + mock_explorer_client = MagicMock(spec=ExplorerClient) + collections = [ + QuestionCollection(id="c1", name="Collection 1", slug="collection-1", question_id="q1"), + QuestionCollection(id="c2", name="Collection 2", slug="collection-2", question_id="q2"), + QuestionCollection(id="c3", name="Collection 3", slug="collection-3", question_id="q3") + ] + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=collections, + inputs={"param1": "value1"} + ) + + # Mock the _query_single_collection method + mock_results = [ + {"collectionId": "c1", "collectionSlug": "collection-1", "results": {"data": [{"r1": "d1"}]}, "error": None, "failureInfo": None}, + {"collectionId": "c2", "collectionSlug": "collection-2", "results": {"data": [{"r2": "d2"}]}, "error": None, "failureInfo": None}, + {"collectionId": "c3", "collectionSlug": "collection-3", "results": {"data": [{"r3": "d3"}]}, "error": None, "failureInfo": None} + ] + + with patch.object(loader, '_query_single_collection', side_effect=mock_results): + with patch('dnastack.client.explorer.client.ThreadPoolExecutor') as mock_executor_class: + with patch('dnastack.client.explorer.client.as_completed') as mock_as_completed: + mock_executor = MagicMock() + mock_executor_class.return_value.__enter__.return_value = mock_executor + + # Mock futures + mock_futures = [] + for i, result in enumerate(mock_results): + future = MagicMock() + future.result.return_value = result + mock_futures.append(future) + + mock_executor.submit.side_effect = mock_futures + mock_as_completed.return_value = mock_futures + + results = loader.load() + + # Verify ThreadPoolExecutor was used + mock_executor_class.assert_called_once() + + # Verify all collections were submitted for execution + assert_that(mock_executor.submit.call_count).is_equal_to(3) + + # Verify results are aggregated correctly + assert_that(results).is_length(3) + assert_that(results[0]["collectionId"]).is_equal_to("c1") + assert_that(results[1]["collectionId"]).is_equal_to("c2") + assert_that(results[2]["collectionId"]).is_equal_to("c3") + + def test_should_handle_mixed_success_and_failure_results(self, monkeypatch): + """Test handling of mixed success and failure scenarios""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + + mock_explorer_client = MagicMock(spec=ExplorerClient) + collections = [ + QuestionCollection(id="c1", name="Collection 1", slug="collection-1", question_id="q1"), + QuestionCollection(id="c2", name="Collection 2", slug="collection-2", question_id="q2") + ] + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=collections, + inputs={"param1": "value1"} + ) + + # Mock results: one success, one failure + mock_results = [ + {"collectionId": "c1", "collectionSlug": "collection-1", "results": {"data": [{"r1": "d1"}]}, "error": None, "failureInfo": None}, + {"collectionId": "c2", "collectionSlug": "collection-2", "results": None, "error": "500: Internal Server Error", "failureInfo": {"status": 500}} + ] + + with patch.object(loader, '_query_single_collection', side_effect=mock_results): + results = loader.load() + + # Should return both success and failure results + assert_that(results).is_length(2) + + # First collection: success + assert_that(results[0]["error"]).is_none() + assert_that(results[0]["results"]["data"]).is_equal_to([{"r1": "d1"}]) + + # Second collection: failure + assert_that(results[1]["error"]).is_not_none() + assert_that(results[1]["results"]).is_none() + + def test_should_raise_inactive_loader_error_on_second_load_attempt(self, monkeypatch): + """Test InactiveLoaderError is raised when load() is called twice""" + from dnastack.client.explorer.client import LocalFederatedQuestionQueryResultLoader, ExplorerClient + from dnastack.client.explorer.models import QuestionCollection + from dnastack.client.result_iterator import InactiveLoaderError + + mock_explorer_client = MagicMock(spec=ExplorerClient) + collections = [ + QuestionCollection(id="c1", name="Collection 1", slug="collection-1", question_id="q1") + ] + + loader = LocalFederatedQuestionQueryResultLoader( + explorer_client=mock_explorer_client, + collections=collections, + inputs={"param1": "value1"} + ) + + mock_result = {"collectionId": "c1", "results": {"data": []}, "error": None, "failureInfo": None} + + with patch.object(loader, '_query_single_collection', return_value=mock_result): + # First load should succeed + results1 = loader.load() + assert_that(results1).is_length(1) + + # Second load should raise InactiveLoaderError + with pytest.raises(InactiveLoaderError): + loader.load() + + +class TestExplorerClientLocalFederation: + """Test cases for ExplorerClient local federation functionality""" + + def test_should_have_ask_question_local_federated_method(self, monkeypatch): + """Test that ask_question_local_federated method exists""" + from dnastack.client.explorer.client import ExplorerClient + + mock_session = MagicMock() + monkeypatch.setattr(ExplorerClient, 'create_http_session', MagicMock(return_value=mock_session)) + + mock_endpoint = MagicMock() + mock_endpoint.url = "https://example.com" + + client = ExplorerClient(mock_endpoint) + + assert_that(hasattr(client, 'ask_question_local_federated')).is_true() + assert_that(callable(client.ask_question_local_federated)).is_true() + + @patch('dnastack.client.explorer.client.ResultIterator') + def test_should_execute_ask_question_local_federated_with_collections(self, mock_result_iterator, monkeypatch): + """Test ask_question_local_federated method execution""" + from dnastack.client.explorer.client import ExplorerClient + + mock_session = MagicMock() + monkeypatch.setattr(ExplorerClient, 'create_http_session', MagicMock(return_value=mock_session)) + + mock_endpoint = MagicMock() + mock_endpoint.url = "https://example.com" + + mock_iterator = MagicMock() + mock_result_iterator.return_value = mock_iterator + + client = ExplorerClient(mock_endpoint) + + # Mock describe_federated_question to return collections + mock_question = MagicMock() + mock_question.collections = [ + MagicMock(id="c1", name="Collection 1", slug="collection-1", question_id="q1"), + MagicMock(id="c2", name="Collection 2", slug="collection-2", question_id="q2") + ] + + with patch.object(client, 'describe_federated_question', return_value=mock_question): + result = client.ask_question_local_federated('test_question', inputs={'param1': 'value1'}) + + assert_that(result).is_equal_to(mock_iterator) + mock_result_iterator.assert_called_once() + + # Verify LocalFederatedQuestionQueryResultLoader was created + call_args = mock_result_iterator.call_args[0] # Get positional args + loader = call_args[0] # First argument should be the loader + + assert_that(loader.__class__.__name__).is_equal_to('LocalFederatedQuestionQueryResultLoader') + + def test_should_filter_collections_when_provided_to_local_federated(self, monkeypatch): + """Test that specific collections are used when provided""" + from dnastack.client.explorer.client import ExplorerClient + + mock_session = MagicMock() + monkeypatch.setattr(ExplorerClient, 'create_http_session', MagicMock(return_value=mock_session)) + + mock_endpoint = MagicMock() + mock_endpoint.url = "https://example.com" + + client = ExplorerClient(mock_endpoint) + + # Mock describe_federated_question to return collections + mock_question = MagicMock() + mock_question.collections = [ + MagicMock(id="c1", name="Collection 1", slug="collection-1", question_id="q1"), + MagicMock(id="c2", name="Collection 2", slug="collection-2", question_id="q2"), + MagicMock(id="c3", name="Collection 3", slug="collection-3", question_id="q3") + ] + + with patch.object(client, 'describe_federated_question', return_value=mock_question): + with patch('dnastack.client.explorer.client.ResultIterator') as mock_result_iterator: + # Request only specific collections + client.ask_question_local_federated( + 'test_question', + inputs={'param1': 'value1'}, + collections=['c1', 'c3'] # Only collections c1 and c3 + ) + + # Verify ResultIterator was called with filtered collections + call_args = mock_result_iterator.call_args[0] + loader = call_args[0] + + # Check that only requested collections are included + loader_collections = loader._LocalFederatedQuestionQueryResultLoader__collections + collection_ids = [col.id for col in loader_collections] + assert_that(collection_ids).contains_only("c1", "c3") + assert_that(collection_ids).does_not_contain("c2") + + def test_should_use_all_collections_when_none_specified_for_local_federated(self, monkeypatch): + """Test that all collections are used when none are specified""" + from dnastack.client.explorer.client import ExplorerClient + + mock_session = MagicMock() + monkeypatch.setattr(ExplorerClient, 'create_http_session', MagicMock(return_value=mock_session)) + + mock_endpoint = MagicMock() + mock_endpoint.url = "https://example.com" + + client = ExplorerClient(mock_endpoint) + + # Mock describe_federated_question to return collections + mock_question = MagicMock() + mock_question.collections = [ + MagicMock(id="c1", name="Collection 1", slug="collection-1", question_id="q1"), + MagicMock(id="c2", name="Collection 2", slug="collection-2", question_id="q2"), + MagicMock(id="c3", name="Collection 3", slug="collection-3", question_id="q3") + ] + + with patch.object(client, 'describe_federated_question', return_value=mock_question): + with patch('dnastack.client.explorer.client.ResultIterator') as mock_result_iterator: + # Don't specify collections - should use all + client.ask_question_local_federated( + 'test_question', + inputs={'param1': 'value1'} + ) + + # Verify ResultIterator was called with all collections + call_args = mock_result_iterator.call_args[0] + loader = call_args[0] + + loader_collections = loader._LocalFederatedQuestionQueryResultLoader__collections + collection_ids = [col.id for col in loader_collections] + assert_that(collection_ids).contains_only("c1", "c2", "c3") + + def test_should_raise_error_for_invalid_collection_ids_in_local_federated(self, monkeypatch): + """Test error handling for invalid collection IDs""" + from dnastack.client.explorer.client import ExplorerClient + from dnastack.http.session import ClientError + + mock_session = MagicMock() + monkeypatch.setattr(ExplorerClient, 'create_http_session', MagicMock(return_value=mock_session)) + + mock_endpoint = MagicMock() + mock_endpoint.url = "https://example.com" + + client = ExplorerClient(mock_endpoint) + + # Mock describe_federated_question to return collections + mock_question = MagicMock() + mock_question.collections = [ + MagicMock(id="c1", name="Collection 1"), + MagicMock(id="c2", name="Collection 2") + ] + + with patch.object(client, 'describe_federated_question', return_value=mock_question): + # Request invalid collection ID + with pytest.raises(ClientError) as exc_info: + client.ask_question_local_federated( + 'test_question', + inputs={'param1': 'value1'}, + collections=['c1', 'invalid_collection', 'c2'] + ) + + # Check the exception message without triggering __str__ + assert_that(exc_info.value.message).contains("Invalid collection IDs") \ No newline at end of file