Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

### Other Changes

- Configuration manager/worker fetch via OneSettings part 1 - Change detection
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
- Configuration manager/worker fetch via OneSettings part 2 - Concurrency and refactoring of _ConfigurationManager
([#42508] https://github.com/Azure/azure-sdk-for-python/pull/42508)

## 1.0.0b41 (2025-07-31)

### Features Added

- Configuration manager/worker fetch via OneSettings part 1
([#42360] https://github.com/Azure/azure-sdk-for-python/pull/42360)
- Added RateLimited Sampler
([#41954](https://github.com/Azure/azure-sdk-for-python/pull/41954))
- Refactored Application Insights Sampler Code
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from dataclasses import dataclass, field
from typing import Dict, Optional
import logging
from threading import Lock
Expand All @@ -8,26 +9,40 @@
_ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
_ONE_SETTINGS_PYTHON_KEY,
_ONE_SETTINGS_CHANGE_URL,
_ONE_SETTINGS_CONFIG_URL,
)
from azure.monitor.opentelemetry.exporter._configuration._utils import make_onesettings_request

# Set up logger
logger = logging.getLogger(__name__)


@dataclass
class _ConfigurationState:
"""Immutable state object for configuration data."""
etag: str = ""
refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
version_cache: int = -1
settings_cache: Dict[str, str] = field(default_factory=dict)

def with_updates(self, **kwargs) -> '_ConfigurationState': # pylint: disable=C4741,C4742
"""Create a new state object with updated values."""
return _ConfigurationState(
etag=kwargs.get('etag', self.etag),
refresh_interval=kwargs.get('refresh_interval', self.refresh_interval),
version_cache=kwargs.get('version_cache', self.version_cache),
settings_cache=kwargs.get('settings_cache', self.settings_cache.copy())
)


class _ConfigurationManager:
"""Singleton class to manage configuration settings."""

_instance = None
_configuration_worker = None
_instance_lock = Lock()
_config_lock = Lock()
_settings_lock = Lock()
_version_lock = Lock()
_etag = None
_refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
_settings_cache: Dict[str, str] = {}
_version_cache = 0
_state_lock = Lock() # Single lock for all state
_current_state = _ConfigurationState()

def __new__(cls):
with cls._instance_lock:
Expand All @@ -41,81 +56,156 @@ def _initialize_worker(self):
"""Initialize the ConfigurationManager and start the configuration worker."""
# Lazy import to avoid circular import
from azure.monitor.opentelemetry.exporter._configuration._worker import _ConfigurationWorker
self._configuration_worker = _ConfigurationWorker(self._refresh_interval)

# Get initial refresh interval from state
with _ConfigurationManager._state_lock:
initial_refresh_interval = _ConfigurationManager._current_state.refresh_interval

self._configuration_worker = _ConfigurationWorker(initial_refresh_interval)

def get_configuration_and_refresh_interval(self, query_dict: Optional[Dict[str, str]] = None) -> int:
"""Fetch configuration from OneSettings and update local cache.
"""Fetch configuration from OneSettings and update local cache atomically.

This method performs a conditional HTTP request to OneSettings using the
current ETag for efficient caching. It updates the local configuration
cache with any new settings and manages version tracking for change detection.
current ETag for efficient caching. It atomically updates the local configuration
state with any new settings and manages version tracking for change detection.

The method implements a check-and-set pattern for thread safety:
1. Reads current state atomically to prepare request headers
2. Makes HTTP request to OneSettings CHANGE endpoint outside locks
3. Re-reads current state to make version comparison decisions
4. Conditionally fetches from CONFIG endpoint if version increased
5. Updates all state fields atomically in a single operation

Version comparison logic:
- Version increase: New configuration available, fetches and caches new settings
- Version same: No changes detected, ETag and refresh interval updated safely
- Version decrease: Unexpected rollback state, logged as warning, no updates applied

The method handles version comparison logic:
- Version increase: New configuration available, cache updated
- Version same: No changes, cache remains unchanged
- Version decrease: Unexpected state, logged as warning

:param query_dict: Optional query parameters to include
in the OneSettings request. Commonly used for targeting specific
configuration namespaces or environments.
Error handling:
- CONFIG endpoint failure: ETag not updated to preserve retry capability on next call
- Network failures: Handled by make_onesettings_request, returns default values
- Missing settings/version: Logged as warning, only ETag and refresh interval updated

:param query_dict: Optional query parameters to include in the OneSettings request.
Commonly used for targeting specific configuration namespaces or environments.
If None, defaults to empty dictionary.
:type query_dict: Optional[Dict[str, str]]

:return: Updated refresh interval in seconds for the next configuration check.
This value comes from the OneSettings response and determines how frequently
the background worker should call this method.
:rtype: int

Thread Safety:
This method is thread-safe and uses multiple locks to ensure consistent
state across concurrent access to configuration data.
This method is thread-safe using atomic state updates. Multiple threads can
call this method concurrently without data corruption. The implementation uses
a single state lock with minimal critical sections to reduce lock contention.

HTTP requests are performed outside locks to prevent blocking other threads
during potentially slow network operations.

Note:
The method automatically handles ETag-based conditional requests to
minimize unnecessary data transfer when configuration hasn't changed.
Caching Behavior:
The method automatically includes ETag headers for conditional requests to
minimize unnecessary data transfer. If the server responds with 304 Not Modified,
only the refresh interval is updated while preserving existing configuration.

On CONFIG endpoint failures, the ETag is intentionally not updated to ensure
the next request can retry fetching the same configuration version.

State Consistency:
All configuration state (ETag, refresh interval, version, settings) is updated
atomically using immutable state objects. This prevents race conditions where
different threads might observe inconsistent combinations of these values.
"""
query_dict = query_dict or {}
headers = {}

# Prepare headers with current etag and refresh interval
with self._config_lock:
if self._etag:
headers["If-None-Match"] = self._etag
if self._refresh_interval:
headers["x-ms-onesetinterval"] = str(self._refresh_interval)
# Read current state atomically
with _ConfigurationManager._state_lock:
current_state = _ConfigurationManager._current_state
if current_state.etag:
headers["If-None-Match"] = current_state.etag
if current_state.refresh_interval:
headers["x-ms-onesetinterval"] = str(current_state.refresh_interval)

# Make the OneSettings request
response = make_onesettings_request(_ONE_SETTINGS_CHANGE_URL, query_dict, headers)

# Update configuration state based on response
with self._config_lock:
self._etag = response.etag
self._refresh_interval = response.refresh_interval

# Evaluate CONFIG_VERSION to see if we need to fetch new config
if response.settings:
with self._version_lock:
if response.version is not None:
# New config published successfully, make a call to config endpoint
if response.version > self._version_cache:
# TODO: Call config endpoint to pull new config
# Update latest version
self._version_cache = response.version
elif response.version == self._version_cache:
# No new config has been published, do nothing
pass
# Prepare new state updates
new_state_updates = {}
if response.etag is not None:
new_state_updates['etag'] = response.etag
if response.refresh_interval and response.refresh_interval > 0:
new_state_updates['refresh_interval'] = response.refresh_interval # type: ignore

if response.status_code == 304:
# Not modified: Only update etag and refresh interval below
pass
# Handle version and settings updates
elif response.settings and response.version is not None:
needs_config_fetch = False
with _ConfigurationManager._state_lock:
current_state = _ConfigurationManager._current_state

if response.version > current_state.version_cache:
# Version increase: new config available
needs_config_fetch = True
elif response.version < current_state.version_cache:
# Version rollback: Erroneous state
logger.warning("Fetched version is lower than cached version. No configurations updated.")
needs_config_fetch = False
else:
# Version unchanged: No new config
needs_config_fetch = False

# Fetch config
if needs_config_fetch:
config_response = make_onesettings_request(_ONE_SETTINGS_CONFIG_URL, query_dict)
if config_response.status_code == 200 and config_response.settings:
# Validate that the versions from change and config match
if config_response.version == response.version:
new_state_updates.update({
'version_cache': response.version, # type: ignore
'settings_cache': config_response.settings # type: ignore
})
else:
# Erroneous state, should not occur under normal circumstances
logger.warning(
"Latest `CHANGE_VERSION` is less than the current stored version," \
" no configurations updated."
)
return self._refresh_interval
logger.warning("Version mismatch between change and config responses." \
"No configurations updated.")
# We do not update etag to allow retry on next call
new_state_updates.pop('etag', None)
else:
logger.warning("Unexpected response status: %d", config_response.status_code)
# We do not update etag to allow retry on next call
new_state_updates.pop('etag', None)
else:
# No settings or version provided
logger.warning("No settings or version provided in config response. Config not updated.")

# Atomic state update
with _ConfigurationManager._state_lock:
latest_state = _ConfigurationManager._current_state # Always use latest state
_ConfigurationManager._current_state = latest_state.with_updates(**new_state_updates)
return _ConfigurationManager._current_state.refresh_interval

def get_settings(self) -> Dict[str, str]: # pylint: disable=C4741,C4742
"""Get current settings cache."""
with _ConfigurationManager._state_lock:
return _ConfigurationManager._current_state.settings_cache.copy()

def get_current_version(self) -> int: # pylint: disable=C4741,C4742
"""Get current version."""
with _ConfigurationManager._state_lock:
return _ConfigurationManager._current_state.version_cache

def shutdown(self) -> None:
"""Shutdown the configuration worker."""
with self._instance_lock:
with _ConfigurationManager._instance_lock:
if self._configuration_worker:
self._configuration_worker.shutdown()
self._configuration_worker = None
self._instance = None
if _ConfigurationManager._instance:
_ConfigurationManager._instance = None


def _update_configuration_and_get_refresh_interval() -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ class OneSettingsResponse:
refresh_interval (int): Interval in seconds for the next configuration refresh
settings (Dict[str, str]): Dictionary of configuration key-value pairs
version (Optional[int]): Configuration version number for change tracking
status_code (int): HTTP status code from the response
"""

def __init__(
self,
etag: Optional[str] = None,
refresh_interval: int = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS,
settings: Optional[Dict[str, str]] = None,
version: Optional[int] = None
version: Optional[int] = None,
status_code: int = 200
):
"""Initialize OneSettingsResponse with configuration data.

Expand All @@ -42,11 +44,13 @@ def __init__(
settings (Optional[Dict[str, str]], optional): Configuration settings dictionary.
Defaults to empty dict if None.
version (Optional[int], optional): Configuration version number. Defaults to None.
status_code (int, optional): HTTP status code. Defaults to 200.
"""
self.etag = etag
self.refresh_interval = refresh_interval
self.settings = settings or {}
self.version = version
self.status_code = status_code


def make_onesettings_request(url: str, query_dict: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -115,6 +119,7 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo
- refresh_interval: Next refresh interval from headers
- settings: Configuration key-value pairs (empty for 304/errors)
- version: Configuration version number for change tracking
- status_code: HTTP status code of the response
:rtype: OneSettingsResponse
Note:
This function logs warnings for various error conditions but does not
Expand All @@ -131,7 +136,9 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo
etag = response.headers.get("ETag")
refresh_interval_header = response.headers.get("x-ms-onesetinterval")
try:
refresh_interval = int(refresh_interval_header) if refresh_interval_header else refresh_interval
# Note: OneSettings refresh interval is in minutes, convert to seconds
if refresh_interval_header:
refresh_interval = int(refresh_interval_header) * 60
except (ValueError, TypeError):
logger.warning("Invalid refresh interval format: %s", refresh_interval_header)
refresh_interval = _ONE_SETTINGS_DEFAULT_REFRESH_INTERVAL_SECONDS
Expand Down Expand Up @@ -162,4 +169,4 @@ def _parse_onesettings_response(response: requests.Response) -> OneSettingsRespo
elif status_code == 500:
logger.warning("Internal server error from OneSettings: %s", response.content)

return OneSettingsResponse(etag, refresh_interval, settings, version)
return OneSettingsResponse(etag, refresh_interval, settings, version, status_code)
Loading