diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 02c8f71a0935..93cc0e45b393 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -12,12 +12,14 @@ ### Other Changes +- Configuration manager/worker fetch via OneSettings part 1 - Change detection + ([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360) +- Configuration manager/worker fetch via OneSettings part 2 - Concurrency and refactoring of _ConfigurationManager + ([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508) + ## 1.0.0b41 (2025-07-31) ### Features Added - -- Configuration manager/worker fetch via OneSettings part 1 - ([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360) - Added RateLimited Sampler ([#41954](https://github.com/Azure/azure-sdk-for-python/pull/41954)) - Refactored Application Insights Sampler Code diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/__init__.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/__init__.py index 1ad92fd6d43b..5a9c20f14c7a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/__init__.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/__init__.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +from dataclasses import dataclass, field from typing import Dict, Optional import logging from threading import Lock @@ -8,6 +9,7 @@ _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS, _ONE_SETTINGS_PYTHON_KEY, _ONE_SETTINGS_CHANGE_URL, + _ONE_SETTINGS_CONFIG_URL, ) from azure.monitor.opentelemetry.exporter._configuration._utils import make_onesettings_request @@ -15,19 +17,32 @@ logger = logging.getLogger(__name__) +@dataclass +class _ConfigurationState: + """Immutable state object for configuration data.""" + etag: str = "" + refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS + version_cache: int = -1 + settings_cache: Dict[str, str] = field(default_factory=dict) + + def with_updates(self, **kwargs) -> '_ConfigurationState': # pylint: disable=C4741,C4742 + """Create a new state object with updated values.""" + return _ConfigurationState( + etag=kwargs.get('etag', self.etag), + refresh_interval=kwargs.get('refresh_interval', self.refresh_interval), + version_cache=kwargs.get('version_cache', self.version_cache), + settings_cache=kwargs.get('settings_cache', self.settings_cache.copy()) + ) + + class _ConfigurationManager: """Singleton class to manage configuration settings.""" _instance = None _configuration_worker = None _instance_lock = Lock() - _config_lock = Lock() - _settings_lock = Lock() - _version_lock = Lock() - _etag = None - _refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS - _settings_cache: Dict[str, str] = {} - _version_cache = 0 + _state_lock = Lock() # Single lock for all state + _current_state = _ConfigurationState() def __new__(cls): with cls._instance_lock: @@ -41,81 +56,156 @@ def _initialize_worker(self): """Initialize the ConfigurationManager and start the configuration worker.""" # Lazy import to avoid circular import from azure.monitor.opentelemetry.exporter._configuration._worker import _ConfigurationWorker - self._configuration_worker = _ConfigurationWorker(self._refresh_interval) + + # Get initial refresh interval from state + with _ConfigurationManager._state_lock: + initial_refresh_interval = _ConfigurationManager._current_state.refresh_interval + + self._configuration_worker = _ConfigurationWorker(initial_refresh_interval) def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str, str]] = None) -> int: - """Fetch configuration from OneSettings and update local cache. + """Fetch configuration from OneSettings and update local cache atomically. This method performs a conditional HTTP request to OneSettings using the - current ETag for efficient caching. It updates the local configuration - cache with any new settings and manages version tracking for change detection. + current ETag for efficient caching. It atomically updates the local configuration + state with any new settings and manages version tracking for change detection. + + The method implements a check-and-set pattern for thread safety: + 1. Reads current state atomically to prepare request headers + 2. Makes HTTP request to OneSettings CHANGE endpoint outside locks + 3. Re-reads current state to make version comparison decisions + 4. Conditionally fetches from CONFIG endpoint if version increased + 5. Updates all state fields atomically in a single operation + + Version comparison logic: + - Version increase: New configuration available, fetches and caches new settings + - Version same: No changes detected, ETag and refresh interval updated safely + - Version decrease: Unexpected rollback state, logged as warning, no updates applied - The method handles version comparison logic: - - Version increase: New configuration available, cache updated - - Version same: No changes, cache remains unchanged - - Version decrease: Unexpected state, logged as warning - - :param query_dict: Optional query parameters to include - in the OneSettings request. Commonly used for targeting specific - configuration namespaces or environments. + Error handling: + - CONFIG endpoint failure: ETag not updated to preserve retry capability on next call + - Network failures: Handled by make_onesettings_request, returns default values + - Missing settings/version: Logged as warning, only ETag and refresh interval updated + + :param query_dict: Optional query parameters to include in the OneSettings request. + Commonly used for targeting specific configuration namespaces or environments. + If None, defaults to empty dictionary. :type query_dict: Optional[Dict[str, str]] :return: Updated refresh interval in seconds for the next configuration check. + This value comes from the OneSettings response and determines how frequently + the background worker should call this method. :rtype: int Thread Safety: - This method is thread-safe and uses multiple locks to ensure consistent - state across concurrent access to configuration data. + This method is thread-safe using atomic state updates. Multiple threads can + call this method concurrently without data corruption. The implementation uses + a single state lock with minimal critical sections to reduce lock contention. + + HTTP requests are performed outside locks to prevent blocking other threads + during potentially slow network operations. - Note: - The method automatically handles ETag-based conditional requests to - minimize unnecessary data transfer when configuration hasn't changed. + Caching Behavior: + The method automatically includes ETag headers for conditional requests to + minimize unnecessary data transfer. If the server responds with 304 Not Modified, + only the refresh interval is updated while preserving existing configuration. + + On CONFIG endpoint failures, the ETag is intentionally not updated to ensure + the next request can retry fetching the same configuration version. + + State Consistency: + All configuration state (ETag, refresh interval, version, settings) is updated + atomically using immutable state objects. This prevents race conditions where + different threads might observe inconsistent combinations of these values. """ query_dict = query_dict or {} headers = {} - # Prepare headers with current etag and refresh interval - with self._config_lock: - if self._etag: - headers["If-None-Match"] = self._etag - if self._refresh_interval: - headers["x-ms-onesetinterval"] = str(self._refresh_interval) + # Read current state atomically + with _ConfigurationManager._state_lock: + current_state = _ConfigurationManager._current_state + if current_state.etag: + headers["If-None-Match"] = current_state.etag + if current_state.refresh_interval: + headers["x-ms-onesetinterval"] = str(current_state.refresh_interval) # Make the OneSettings request response = make_onesettings_request(_ONE_SETTINGS_CHANGE_URL, query_dict, headers) - # Update configuration state based on response - with self._config_lock: - self._etag = response.etag - self._refresh_interval = response.refresh_interval - - # Evaluate CONFIG_VERSION to see if we need to fetch new config - if response.settings: - with self._version_lock: - if response.version is not None: - # New config published successfully, make a call to config endpoint - if response.version > self._version_cache: - # TODO: Call config endpoint to pull new config - # Update latest version - self._version_cache = response.version - elif response.version == self._version_cache: - # No new config has been published, do nothing - pass + # Prepare new state updates + new_state_updates = {} + if response.etag is not None: + new_state_updates['etag'] = response.etag + if response.refresh_interval and response.refresh_interval > 0: + new_state_updates['refresh_interval'] = response.refresh_interval # type: ignore + + if response.status_code == 304: + # Not modified: Only update etag and refresh interval below + pass + # Handle version and settings updates + elif response.settings and response.version is not None: + needs_config_fetch = False + with _ConfigurationManager._state_lock: + current_state = _ConfigurationManager._current_state + + if response.version > current_state.version_cache: + # Version increase: new config available + needs_config_fetch = True + elif response.version < current_state.version_cache: + # Version rollback: Erroneous state + logger.warning("Fetched version is lower than cached version. No configurations updated.") + needs_config_fetch = False + else: + # Version unchanged: No new config + needs_config_fetch = False + + # Fetch config + if needs_config_fetch: + config_response = make_onesettings_request(_ONE_SETTINGS_CONFIG_URL, query_dict) + if config_response.status_code == 200 and config_response.settings: + # Validate that the versions from change and config match + if config_response.version == response.version: + new_state_updates.update({ + 'version_cache': response.version, # type: ignore + 'settings_cache': config_response.settings # type: ignore + }) else: - # Erroneous state, should not occur under normal circumstances - logger.warning( - "Latest `CHANGE_VERSION` is less than the current stored version," \ - " no configurations updated." - ) - return self._refresh_interval + logger.warning("Version mismatch between change and config responses." \ + "No configurations updated.") + # We do not update etag to allow retry on next call + new_state_updates.pop('etag', None) + else: + logger.warning("Unexpected response status: %d", config_response.status_code) + # We do not update etag to allow retry on next call + new_state_updates.pop('etag', None) + else: + # No settings or version provided + logger.warning("No settings or version provided in config response. Config not updated.") + + # Atomic state update + with _ConfigurationManager._state_lock: + latest_state = _ConfigurationManager._current_state # Always use latest state + _ConfigurationManager._current_state = latest_state.with_updates(**new_state_updates) + return _ConfigurationManager._current_state.refresh_interval + + def get_settings(self) -> Dict[str, str]: # pylint: disable=C4741,C4742 + """Get current settings cache.""" + with _ConfigurationManager._state_lock: + return _ConfigurationManager._current_state.settings_cache.copy() + + def get_current_version(self) -> int: # pylint: disable=C4741,C4742 + """Get current version.""" + with _ConfigurationManager._state_lock: + return _ConfigurationManager._current_state.version_cache def shutdown(self) -> None: """Shutdown the configuration worker.""" - with self._instance_lock: + with _ConfigurationManager._instance_lock: if self._configuration_worker: self._configuration_worker.shutdown() self._configuration_worker = None - self._instance = None + if _ConfigurationManager._instance: + _ConfigurationManager._instance = None def _update_configuration_and_get_refresh_interval() -> int: diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_utils.py index 515cd13d0603..4d1a29da6549 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_utils.py @@ -24,6 +24,7 @@ class OneSettingsResponse: refresh_interval (int): Interval in seconds for the next configuration refresh settings (Dict[str, str]): Dictionary of configuration key-value pairs version (Optional[int]): Configuration version number for change tracking + status_code (int): HTTP status code from the response """ def __init__( @@ -31,7 +32,8 @@ def __init__( etag: Optional[str] = None, refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS, settings: Optional[Dict[str, str]] = None, - version: Optional[int] = None + version: Optional[int] = None, + status_code: int = 200 ): """Initialize OneSettingsResponse with configuration data. @@ -42,11 +44,13 @@ def __init__( settings (Optional[Dict[str, str]], optional): Configuration settings dictionary. Defaults to empty dict if None. version (Optional[int], optional): Configuration version number. Defaults to None. + status_code (int, optional): HTTP status code. Defaults to 200. """ self.etag = etag self.refresh_interval = refresh_interval self.settings = settings or {} self.version = version + self.status_code = status_code def make_onesettings_request(url: str, query_dict: Optional[Dict[str, str]] = None, @@ -115,6 +119,7 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo - refresh_interval: Next refresh interval from headers - settings: Configuration key-value pairs (empty for 304/errors) - version: Configuration version number for change tracking + - status_code: HTTP status code of the response :rtype: OneSettingsResponse Note: This function logs warnings for various error conditions but does not @@ -131,7 +136,9 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo etag = response.headers.get("ETag") refresh_interval_header = response.headers.get("x-ms-onesetinterval") try: - refresh_interval = int(refresh_interval_header) if refresh_interval_header else refresh_interval + # Note: OneSettings refresh interval is in minutes, convert to seconds + if refresh_interval_header: + refresh_interval = int(refresh_interval_header) * 60 except (ValueError, TypeError): logger.warning("Invalid refresh interval format: %s", refresh_interval_header) refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS @@ -162,4 +169,4 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo elif status_code == 500: logger.warning("Internal server error from OneSettings: %s", response.content) - return OneSettingsResponse(etag, refresh_interval, settings, version) + return OneSettingsResponse(etag, refresh_interval, settings, version, status_code) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_worker.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_worker.py index 79478f669bbf..dea560e8ff4e 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_worker.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_configuration/_worker.py @@ -3,6 +3,7 @@ import logging import threading +import random from azure.monitor.opentelemetry.exporter._configuration import _update_configuration_and_get_refresh_interval logger = logging.getLogger(__name__) @@ -31,15 +32,17 @@ def __init__(self, refresh_interval=None) -> None: """Initialize and start the configuration worker thread. Creates and starts a background daemon thread that will periodically refresh - configuration from OneSettings. The thread starts immediately upon initialization. + configuration from OneSettings. The thread starts immediately upon initialization + with a random startup delay to prevent thundering herd issues. Args: refresh_interval (Optional[int]): Initial refresh interval in seconds. If None, defaults to 3600 seconds (1 hour). Note: - The background thread is created as a daemon thread, which means it will - not prevent the main program from exiting. + The background thread is created as a daemon thread and includes a random + 0-15 second startup delay to stagger configuration requests across multiple + SDK instances during startup or recovery from outages. """ self._default_refresh_interval = 3600 # Default to 60 minutes in seconds self._interval_lock = threading.Lock() @@ -100,14 +103,14 @@ def _get_configuration(self) -> None: """Main configuration refresh loop executed in the background thread. This method implements the core logic of the configuration worker: - 1. Continuously loops until shutdown is requested - 2. Calls the configuration update function to fetch new settings - 3. Updates the refresh interval based on the server response - 4. Waits for the next refresh cycle or shutdown signal + 1. Applies random startup delay (0-15 seconds) to stagger requests + 2. Continuously loops until shutdown is requested + 3. Calls the configuration update function to fetch new settings + 4. Updates the refresh interval based on the server response + 5. Waits for the next refresh cycle or shutdown signal - The loop handles exceptions gracefully by logging warnings and continuing - operation. The wait operation uses the shutdown event to enable immediate - response to shutdown requests even during long wait periods. + The initial random delay helps prevent thundering herd problems when many + SDK instances start up simultaneously after service outages or deployments. Error Handling: - All exceptions are caught and logged as warnings @@ -119,6 +122,14 @@ def _get_configuration(self) -> None: - Uses _shutdown_event.wait() for interruptible sleep periods - Exits cleanly when shutdown is requested """ + # Add random startup delay (0-15 seconds) to stagger configuration requests + # This prevents thundering herd when many SDKs start simultaneously + startup_delay = random.uniform(0.0, 15.0) + + if self._shutdown_event.wait(startup_delay): + # Shutdown requested during startup delay + return + while not self._shutdown_event.is_set(): try: # Perform the refresh operation diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_manager.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_manager.py index 54583c4207d6..68484538b5de 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_manager.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_manager.py @@ -1,11 +1,11 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. import unittest -import threading from unittest.mock import Mock, patch from azure.monitor.opentelemetry.exporter._configuration import ( _ConfigurationManager, + _ConfigurationState, _update_configuration_and_get_refresh_interval, ) from azure.monitor.opentelemetry.exporter._configuration._utils import OneSettingsResponse @@ -13,21 +13,78 @@ _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS, _ONE_SETTINGS_PYTHON_KEY, _ONE_SETTINGS_CHANGE_URL, + _ONE_SETTINGS_CONFIG_URL, ) +class TestConfigurationState(unittest.TestCase): + """Test cases for _ConfigurationState immutable data class.""" + + def test_default_values(self): + """Test that _ConfigurationState has correct default values.""" + state = _ConfigurationState() + + self.assertEqual(state.etag, "") + self.assertEqual(state.refresh_interval, _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS) + self.assertEqual(state.version_cache, -1) + self.assertEqual(state.settings_cache, {}) + + def test_with_updates_single_field(self): + """Test updating a single field creates new state object.""" + original_state = _ConfigurationState() + updated_state = original_state.with_updates(etag="new-etag") + + # Original state unchanged + self.assertEqual(original_state.etag, "") + # New state has updated value + self.assertEqual(updated_state.etag, "new-etag") + # Other fields preserved + self.assertEqual(updated_state.refresh_interval, _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS) + self.assertEqual(updated_state.version_cache, -1) + + def test_with_updates_multiple_fields(self): + """Test updating multiple fields creates new state object.""" + original_state = _ConfigurationState() + updated_state = original_state.with_updates( + etag="test-etag", + refresh_interval=60, + version_cache=5, + settings_cache={"key": "value"} + ) + + # Original state unchanged + self.assertEqual(original_state.etag, "") + self.assertEqual(original_state.refresh_interval, _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS) + self.assertEqual(original_state.version_cache, -1) + self.assertEqual(original_state.settings_cache, {}) + + # New state has all updated values + self.assertEqual(updated_state.etag, "test-etag") + self.assertEqual(updated_state.refresh_interval, 60) + self.assertEqual(updated_state.version_cache, 5) + self.assertEqual(updated_state.settings_cache, {"key": "value"}) + + def test_settings_cache_isolation(self): + """Test that settings_cache is properly isolated between state objects.""" + original_state = _ConfigurationState() + original_state.settings_cache["original"] = "value" + + updated_state = original_state.with_updates(settings_cache={"new": "value"}) + + # Original and updated states should be isolated + self.assertEqual(original_state.settings_cache, {"original": "value"}) + self.assertEqual(updated_state.settings_cache, {"new": "value"}) + + class TestConfigurationManager(unittest.TestCase): """Test cases for _ConfigurationManager class.""" def setUp(self): """Reset singleton state before each test.""" - # Reset the singleton instance + # Reset the singleton instance and class variables _ConfigurationManager._instance = None _ConfigurationManager._configuration_worker = None - _ConfigurationManager._etag = None - _ConfigurationManager._refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS - _ConfigurationManager._settings_cache = {} - _ConfigurationManager._version_cache = 0 + _ConfigurationManager._current_state = _ConfigurationState() def tearDown(self): """Clean up after each test.""" @@ -37,6 +94,7 @@ def tearDown(self): # Reset singleton _ConfigurationManager._instance = None _ConfigurationManager._configuration_worker = None + _ConfigurationManager._current_state = _ConfigurationState() @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') def test_singleton_pattern(self, mock_worker_class): @@ -61,20 +119,21 @@ def test_worker_initialization(self, mock_worker_class): manager = _ConfigurationManager() - # Verify worker was created with correct refresh interval + # Verify worker was created with correct refresh interval (default 30) mock_worker_class.assert_called_once_with(_ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS) self.assertEqual(manager._configuration_worker, mock_worker_instance) @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') - def test_get_configuration_and_refresh_interval(self, mock_worker_class, mock_request): - """Test get_configuration_and_refresh_interval method.""" - # Setup + def test_get_configuration_basic_success(self, mock_worker_class, mock_request): + """Test basic successful configuration retrieval without CONFIG fetch.""" + # Setup - Use version -1 to match initial state, no CONFIG fetch mock_response = OneSettingsResponse( etag="test-etag", - refresh_interval=1800.0, - settings={"key1": "value1"}, - version=5 + refresh_interval=1800, + settings={"key": "value"}, + version=-1, # Same as initial version, no CONFIG fetch + status_code=200 ) mock_request.return_value = mock_response @@ -83,13 +142,17 @@ def test_get_configuration_and_refresh_interval(self, mock_worker_class, mock_re # Execute result = manager.get_configuration_and_refresh_interval({"param": "value"}) - # Verify - self.assertEqual(result, 1800.0) - self.assertEqual(manager._etag, "test-etag") - self.assertEqual(manager._refresh_interval, 1800.0) - self.assertEqual(manager._version_cache, 5) + # Verify return value + self.assertEqual(result, 1800) - # Verify request was made with correct parameters + # Verify state was updated (now using class variables consistently) + current_state = _ConfigurationManager._current_state + self.assertEqual(current_state.etag, "test-etag") + self.assertEqual(current_state.refresh_interval, 1800) + self.assertEqual(current_state.version_cache, -1) # No version change + self.assertEqual(current_state.settings_cache, {}) # No settings update since no CONFIG fetch + + # Verify only one request was made (to CHANGE endpoint only) mock_request.assert_called_once() call_args = mock_request.call_args self.assertEqual(call_args[0][0], _ONE_SETTINGS_CHANGE_URL) # URL @@ -97,17 +160,25 @@ def test_get_configuration_and_refresh_interval(self, mock_worker_class, mock_re @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') - def test_etag_headers(self, mock_worker_class, mock_request): + def test_etag_headers_included(self, mock_worker_class, mock_request): """Test that etag is included in request headers.""" # Setup - first call sets etag - mock_response1 = OneSettingsResponse(etag="test-etag", refresh_interval=1800.0) + mock_response1 = OneSettingsResponse( + etag="test-etag", + refresh_interval=1800, + status_code=200 + ) mock_request.return_value = mock_response1 manager = _ConfigurationManager() manager.get_configuration_and_refresh_interval() # Setup - second call should include etag - mock_response2 = OneSettingsResponse(etag="new-etag", refresh_interval=2400.0) + mock_response2 = OneSettingsResponse( + etag="new-etag", + refresh_interval=2400, + status_code=200 + ) mock_request.return_value = mock_response2 # Execute second call @@ -118,108 +189,277 @@ def test_etag_headers(self, mock_worker_class, mock_request): second_call_args = mock_request.call_args headers = second_call_args[0][2] # headers parameter self.assertEqual(headers["If-None-Match"], "test-etag") - self.assertEqual(headers["x-ms-onesetinterval"], "1800.0") + self.assertEqual(headers["x-ms-onesetinterval"], "1800") @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') - def test_version_cache_logic(self, mock_worker_class, mock_request): - """Test version cache update logic.""" + def test_version_increase_triggers_config_fetch(self, mock_worker_class, mock_request): + """Test that version increase triggers CONFIG endpoint fetch.""" manager = _ConfigurationManager() - # Test version increase (should update cache) + # Mock responses for CHANGE and CONFIG endpoints + change_response = OneSettingsResponse( + etag="test-etag", + refresh_interval=1800, + settings={"key": "value"}, + version=5, + status_code=200 + ) + config_response = OneSettingsResponse( + settings={"key": "config_value"}, + version=5, + status_code=200 + ) + + # Configure mock to return different responses for different URLs + def mock_request_side_effect(url, query_dict, headers=None): + if url == _ONE_SETTINGS_CHANGE_URL: + return change_response + elif url == _ONE_SETTINGS_CONFIG_URL: + return config_response + return OneSettingsResponse() + + mock_request.side_effect = mock_request_side_effect + + # Execute + result = manager.get_configuration_and_refresh_interval() + + # Verify both endpoints were called + self.assertEqual(mock_request.call_count, 2) + + # Verify first call was to CHANGE endpoint + first_call = mock_request.call_args_list[0] + self.assertEqual(first_call[0][0], _ONE_SETTINGS_CHANGE_URL) + + # Verify second call was to CONFIG endpoint + second_call = mock_request.call_args_list[1] + self.assertEqual(second_call[0][0], _ONE_SETTINGS_CONFIG_URL) + + # Verify state was updated with CONFIG response + current_state = _ConfigurationManager._current_state + self.assertEqual(current_state.version_cache, 5) + self.assertEqual(current_state.settings_cache, {"key": "config_value"}) + self.assertEqual(result, 1800) + + @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') + @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') + def test_version_same_no_config_fetch(self, mock_worker_class, mock_request): + """Test that same version does not trigger CONFIG fetch.""" + manager = _ConfigurationManager() + + # Set initial version using class variable + _ConfigurationManager._current_state = _ConfigurationManager._current_state.with_updates(version_cache=5) + + # Mock response with same version mock_response = OneSettingsResponse( + etag="test-etag", + refresh_interval=1800, settings={"key": "value"}, - version=5 + version=5, + status_code=200 ) mock_request.return_value = mock_response + # Execute manager.get_configuration_and_refresh_interval() - self.assertEqual(manager._version_cache, 5) - # Test same version (should not change cache) + # Verify only one call was made (to CHANGE endpoint) + mock_request.assert_called_once() + self.assertEqual(mock_request.call_args[0][0], _ONE_SETTINGS_CHANGE_URL) + + @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') + @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') + @patch('azure.monitor.opentelemetry.exporter._configuration.logger') + def test_version_decrease_warning(self, mock_logger, mock_worker_class, mock_request): + """Test warning when version decreases.""" + manager = _ConfigurationManager() + + # Set initial version using class variable + _ConfigurationManager._current_state = _ConfigurationManager._current_state.with_updates(version_cache=10) + + # Mock response with decreased version mock_response = OneSettingsResponse( + etag="test-etag", + refresh_interval=1800, settings={"key": "value"}, - version=5 + version=5, + status_code=200 ) mock_request.return_value = mock_response + # Execute manager.get_configuration_and_refresh_interval() - self.assertEqual(manager._version_cache, 5) - # Test version increase again + # Verify warning was logged + mock_logger.warning.assert_called_once() + warning_message = mock_logger.warning.call_args[0][0] + self.assertIn("lower than cached version", warning_message) + + # Version cache should not be updated + self.assertEqual(_ConfigurationManager._current_state.version_cache, 10) + + @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') + @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') + def test_304_not_modified_response(self, mock_worker_class, mock_request): + """Test handling of 304 Not Modified response.""" + manager = _ConfigurationManager() + + # Set initial state using class variable + _ConfigurationManager._current_state = _ConfigurationManager._current_state.with_updates( + etag="old-etag", + refresh_interval=1800, + version_cache=5, + settings_cache={"existing": "value"} + ) + + # Mock 304 response mock_response = OneSettingsResponse( - settings={"key": "value"}, - version=7 + etag="new-etag", + refresh_interval=2400, + status_code=304 ) mock_request.return_value = mock_response - manager.get_configuration_and_refresh_interval() - self.assertEqual(manager._version_cache, 7) + # Execute + result = manager.get_configuration_and_refresh_interval() + + # Verify etag and refresh interval updated, but settings/version preserved + current_state = _ConfigurationManager._current_state + self.assertEqual(current_state.etag, "new-etag") + self.assertEqual(current_state.refresh_interval, 2400) + self.assertEqual(current_state.version_cache, 5) # Preserved + self.assertEqual(current_state.settings_cache, {"existing": "value"}) # Preserved + self.assertEqual(result, 2400) @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') @patch('azure.monitor.opentelemetry.exporter._configuration.logger') - def test_version_decrease_warning(self, mock_logger, mock_worker_class, mock_request): - """Test warning when version decreases.""" + def test_config_endpoint_failure_preserves_etag(self, mock_logger, mock_worker_class, mock_request): + """Test that CONFIG endpoint failure preserves ETag for retry.""" manager = _ConfigurationManager() - # Set initial version - mock_response = OneSettingsResponse( + # Mock responses + change_response = OneSettingsResponse( + etag="test-etag", + refresh_interval=1800, settings={"key": "value"}, - version=10 + version=5, + status_code=200 ) - mock_request.return_value = mock_response + config_response = OneSettingsResponse( + status_code=500 # Server error + ) + + def mock_request_side_effect(url, query_dict, headers=None): + if url == _ONE_SETTINGS_CHANGE_URL: + return change_response + elif url == _ONE_SETTINGS_CONFIG_URL: + return config_response + return OneSettingsResponse() + + mock_request.side_effect = mock_request_side_effect + + # Execute manager.get_configuration_and_refresh_interval() - # Test version decrease (should log warning) - mock_response = OneSettingsResponse( + # Verify warning was logged + mock_logger.warning.assert_called() + + # Verify ETag was not updated due to CONFIG failure + current_state = _ConfigurationManager._current_state + self.assertEqual(current_state.etag, "") # Should remain empty + self.assertEqual(current_state.refresh_interval, 1800) # Should still be updated + + @patch('azure.monitor.opentelemetry.exporter._configuration.make_onesettings_request') + @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') + @patch('azure.monitor.opentelemetry.exporter._configuration.logger') + def test_version_mismatch_between_endpoints(self, mock_logger, mock_worker_class, mock_request): + """Test handling of version mismatch between CHANGE and CONFIG endpoints.""" + manager = _ConfigurationManager() + + # Mock responses with mismatched versions + change_response = OneSettingsResponse( + etag="test-etag", + refresh_interval=1800, settings={"key": "value"}, - version=5 + version=5, + status_code=200 ) - mock_request.return_value = mock_response + config_response = OneSettingsResponse( + settings={"key": "config_value"}, + version=6, # Different version! + status_code=200 + ) + + def mock_request_side_effect(url, query_dict, headers=None): + if url == _ONE_SETTINGS_CHANGE_URL: + return change_response + elif url == _ONE_SETTINGS_CONFIG_URL: + return config_response + return OneSettingsResponse() + mock_request.side_effect = mock_request_side_effect + + # Execute manager.get_configuration_and_refresh_interval() # Verify warning was logged - mock_logger.warning.assert_called_once() + mock_logger.warning.assert_called() warning_message = mock_logger.warning.call_args[0][0] - self.assertIn("CHANGE_VERSION", warning_message) - self.assertIn("less than", warning_message) + self.assertIn("Version mismatch", warning_message) - # Version cache should not be updated - self.assertEqual(manager._version_cache, 10) + # Verify ETag was not updated due to version mismatch + current_state = _ConfigurationManager._current_state + self.assertEqual(current_state.etag, "") # Should remain empty + self.assertEqual(current_state.version_cache, -1) # Should not be updated - @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') - def test_get_python_configuration_call(self, mock_worker_class): - """Test _update_configuration_and_get_refresh_interval function calls manager correctly.""" - # Configure the mock worker - mock_worker_instance = Mock() - mock_worker_class.return_value = mock_worker_instance + def test_get_settings(self): + """Test get_settings returns copy of settings cache.""" + manager = _ConfigurationManager() + + # Set some settings using class variable + test_settings = {"key1": "value1", "key2": "value2"} + _ConfigurationManager._current_state = _ConfigurationManager._current_state.with_updates(settings_cache=test_settings) + + # Get settings + returned_settings = manager.get_settings() + + # Verify correct settings returned + self.assertEqual(returned_settings, test_settings) + + # Verify it's a copy (modifying returned dict doesn't affect internal state) + returned_settings["key3"] = "value3" + self.assertEqual(_ConfigurationManager._current_state.settings_cache, test_settings) + + def test_get_current_version(self): + """Test get_current_version returns current version.""" + manager = _ConfigurationManager() + + # Set version using class variable + _ConfigurationManager._current_state = _ConfigurationManager._current_state.with_updates(version_cache=42) - with patch.object(_ConfigurationManager, 'get_configuration_and_refresh_interval') as mock_get_config: - mock_get_config.return_value = 1800.0 - - # Call the global function - result = _update_configuration_and_get_refresh_interval() - - # Verify it called the manager with correct parameters - mock_get_config.assert_called_once_with({"namespaces": _ONE_SETTINGS_PYTHON_KEY}) - self.assertEqual(result, 1800.0) + # Verify version returned + self.assertEqual(manager.get_current_version(), 42) @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') def test_shutdown(self, mock_worker_class): - """Test shutdown method.""" + """Test shutdown properly cleans up worker and instance.""" mock_worker_instance = Mock() mock_worker_class.return_value = mock_worker_instance manager = _ConfigurationManager() - # Call shutdown + # Verify worker exists + self.assertIsNotNone(manager._configuration_worker) + self.assertIsNotNone(_ConfigurationManager._instance) + + # Shutdown manager.shutdown() - # Verify worker shutdown was called + # Verify cleanup mock_worker_instance.shutdown.assert_called_once() - self.assertIsNone(manager._instance) + self.assertIsNone(_ConfigurationManager._configuration_worker) + self.assertIsNone(_ConfigurationManager._instance) class TestUpdateConfigurationFunction(unittest.TestCase): @@ -229,6 +469,7 @@ def setUp(self): """Reset singleton state before each test.""" _ConfigurationManager._instance = None _ConfigurationManager._configuration_worker = None + _ConfigurationManager._current_state = _ConfigurationState() def tearDown(self): """Clean up after each test.""" @@ -236,18 +477,21 @@ def tearDown(self): _ConfigurationManager._instance.shutdown() _ConfigurationManager._instance = None _ConfigurationManager._configuration_worker = None + _ConfigurationManager._current_state = _ConfigurationState() @patch.object(_ConfigurationManager, 'get_configuration_and_refresh_interval') @patch('azure.monitor.opentelemetry.exporter._configuration._worker._ConfigurationWorker') def test_update_configuration_function(self, mock_worker_class, mock_get_config): - """Test _update_configuration_and_get_refresh_interval function.""" - mock_get_config.return_value = 2400.0 + """Test _update_configuration_and_get_refresh_interval function calls manager correctly.""" + # Setup + mock_get_config.return_value = 3600 + # Execute result = _update_configuration_and_get_refresh_interval() - # Verify the function called the manager with correct parameters + # Verify + self.assertEqual(result, 3600) mock_get_config.assert_called_once_with({"namespaces": _ONE_SETTINGS_PYTHON_KEY}) - self.assertEqual(result, 2400.0) if __name__ == '__main__': diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_utils.py index 8fdc5613bec2..d28766a023d8 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/configuration/test_utils.py @@ -27,20 +27,23 @@ def test_init_default_values(self): self.assertEqual(response.refresh_interval, _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS) self.assertEqual(response.settings, {}) self.assertIsNone(response.version) + self.assertEqual(response.status_code, 200) def test_init_custom_values(self): """Test OneSettingsResponse initialization with custom values.""" etag = "test-etag" - refresh_interval = 1800.0 + refresh_interval = 1800 settings = {"key1": "value1", "key2": "value2"} version = 5 + status_code = 304 - response = OneSettingsResponse(etag, refresh_interval, settings, version) + response = OneSettingsResponse(etag, refresh_interval, settings, version, status_code) self.assertEqual(response.etag, etag) self.assertEqual(response.refresh_interval, refresh_interval) self.assertEqual(response.settings, settings) self.assertEqual(response.version, version) + self.assertEqual(response.status_code, status_code) def test_init_empty_settings_dict(self): """Test OneSettingsResponse handles None settings correctly.""" @@ -48,6 +51,12 @@ def test_init_empty_settings_dict(self): self.assertEqual(response.settings, {}) + def test_init_custom_status_code(self): + """Test OneSettingsResponse with custom status code.""" + response = OneSettingsResponse(status_code=404) + + self.assertEqual(response.status_code, 404) + class TestMakeOneSettingsRequest(unittest.TestCase): """Test cases for make_onesettings_request function.""" @@ -59,7 +68,7 @@ def test_successful_request(self, mock_parse, mock_get): # Setup mock_response = Mock() mock_get.return_value = mock_response - expected_response = OneSettingsResponse(etag="test-etag") + expected_response = OneSettingsResponse(etag="test-etag", status_code=200) mock_parse.return_value = expected_response url = "https://test.example.com" @@ -104,6 +113,7 @@ def test_request_exception_handling(self, mock_get): self.assertIsInstance(result, OneSettingsResponse) self.assertIsNone(result.etag) self.assertEqual(result.settings, {}) + self.assertEqual(result.status_code, 200) # Default status code mock_logger.warning.assert_called_once() @patch('azure.monitor.opentelemetry.exporter._configuration._utils.requests.get') @@ -123,6 +133,7 @@ def test_json_decode_error_handling(self, mock_get): # Verify self.assertIsInstance(result, OneSettingsResponse) + self.assertEqual(result.status_code, 200) # Default status code mock_logger.warning.assert_called_once() @patch('azure.monitor.opentelemetry.exporter._configuration._utils.requests.get') @@ -137,6 +148,7 @@ def test_general_exception_handling(self, mock_get): # Verify self.assertIsInstance(result, OneSettingsResponse) + self.assertEqual(result.status_code, 200) # Default status code mock_logger.warning.assert_called_once() @@ -147,11 +159,10 @@ def test_parse_200_response_with_settings(self): """Test parsing 200 response with valid settings.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 200 mock_response.headers = { "ETag": "test-etag-123", - "x-ms-onesetinterval": "1800" + "x-ms-onesetinterval": "60" # Onesettings returns interval in minutes } settings_data = { @@ -167,15 +178,15 @@ def test_parse_200_response_with_settings(self): # Verify self.assertEqual(result.etag, "test-etag-123") - self.assertEqual(result.refresh_interval, 1800.0) + self.assertEqual(result.refresh_interval, 3600) self.assertEqual(result.settings, settings_data["settings"]) self.assertEqual(result.version, 5) + self.assertEqual(result.status_code, 200) def test_parse_304_response(self): """Test parsing 304 Not Modified response.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 304 mock_response.headers = {"ETag": "test-etag-123"} mock_response.content = b"" @@ -187,6 +198,7 @@ def test_parse_304_response(self): self.assertEqual(result.etag, "test-etag-123") self.assertEqual(result.settings, {}) self.assertIsNone(result.version) + self.assertEqual(result.status_code, 304) def test_parse_error_status_codes(self): """Test parsing various error status codes.""" @@ -195,7 +207,6 @@ def test_parse_error_status_codes(self): for status_code in error_codes: with self.subTest(status_code=status_code): mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = status_code mock_response.headers = {} mock_response.content = b"Error content" @@ -205,13 +216,13 @@ def test_parse_error_status_codes(self): self.assertIsInstance(result, OneSettingsResponse) self.assertEqual(result.settings, {}) + self.assertEqual(result.status_code, status_code) mock_logger.warning.assert_called_once() def test_parse_invalid_refresh_interval(self): """Test parsing response with invalid refresh interval.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 200 mock_response.headers = {"x-ms-onesetinterval": "invalid-number"} mock_response.content = b"" @@ -222,13 +233,13 @@ def test_parse_invalid_refresh_interval(self): # Verify self.assertEqual(result.refresh_interval, _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS) + self.assertEqual(result.status_code, 200) mock_logger.warning.assert_called_once() def test_parse_invalid_json_content(self): """Test parsing response with invalid JSON content.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 200 mock_response.headers = {} mock_response.content = b"invalid json content" @@ -239,13 +250,13 @@ def test_parse_invalid_json_content(self): # Verify self.assertEqual(result.settings, {}) + self.assertEqual(result.status_code, 200) mock_logger.warning.assert_called_once() def test_parse_invalid_version_format(self): """Test parsing response with invalid version format.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 200 mock_response.headers = {} @@ -264,13 +275,13 @@ def test_parse_invalid_version_format(self): # Verify self.assertEqual(result.settings, settings_data["settings"]) self.assertIsNone(result.version) + self.assertEqual(result.status_code, 200) mock_logger.warning.assert_called_once() def test_parse_unicode_decode_error(self): """Test parsing response with unicode decode error.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 200 mock_response.headers = {} mock_response.content = b'\x80\x81\x82' # Invalid UTF-8 @@ -281,13 +292,13 @@ def test_parse_unicode_decode_error(self): # Verify self.assertEqual(result.settings, {}) + self.assertEqual(result.status_code, 200) mock_logger.warning.assert_called_once() def test_parse_no_headers(self): """Test parsing response with no headers.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 200 mock_response.headers = None mock_response.content = b"" @@ -298,12 +309,12 @@ def test_parse_no_headers(self): # Verify self.assertIsNone(result.etag) self.assertEqual(result.refresh_interval, _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS) + self.assertEqual(result.status_code, 200) def test_parse_settings_without_version(self): """Test parsing response with settings but no version key.""" # Setup mock_response = Mock() - mock_response.__bool__ = Mock(return_value=True) mock_response.status_code = 200 mock_response.headers = {} @@ -321,6 +332,7 @@ def test_parse_settings_without_version(self): # Verify self.assertEqual(result.settings, settings_data["settings"]) self.assertIsNone(result.version) + self.assertEqual(result.status_code, 200) if __name__ == '__main__':