diff --git a/msal/application.py b/msal/application.py index 62fc6cb8..61e61580 100644 --- a/msal/application.py +++ b/msal/application.py @@ -13,7 +13,7 @@ from .oauth2cli import Client, JwtAssertionCreator from .oauth2cli.oidc import decode_part -from .authority import Authority +from .authority import Authority, WORLD_WIDE from .mex import send_request as mex_send_request from .wstrust_request import send_request as wst_send_request from .wstrust_response import * @@ -146,7 +146,6 @@ def obtain_token_by_username_password(self, username, password, **kwargs): class ClientApplication(object): - ACQUIRE_TOKEN_SILENT_ID = "84" ACQUIRE_TOKEN_BY_REFRESH_TOKEN = "85" ACQUIRE_TOKEN_BY_USERNAME_PASSWORD_ID = "301" @@ -174,6 +173,7 @@ def __init__( # when we would eventually want to add this feature to PCA in future. exclude_scopes=None, http_cache=None, + instance_discovery=None, ): """Create an instance of application. @@ -409,11 +409,40 @@ def __init__( Personally Identifiable Information (PII). Encryption is unnecessary. New in version 1.16.0. + + :param boolean instance_discovery: + Historically, MSAL would connect to a central endpoint located at + ``https://login.microsoftonline.com`` to acquire some metadata, + especially when using an unfamiliar authority. + This behavior is known as Instance Discovery. + + This parameter defaults to None, which enables the Instance Discovery. + + If you know some authorities which you allow MSAL to operate with as-is, + without involving any Instance Discovery, the recommended pattern is:: + + known_authorities = frozenset([ # Treat your known authorities as const + "https://contoso.com/adfs", "https://login.azs/foo"]) + ... + authority = "https://contoso.com/adfs" # Assuming your app will use this + app1 = PublicClientApplication( + "client_id", + authority=authority, + # Conditionally disable Instance Discovery for known authorities + instance_discovery=authority not in known_authorities, + ) + + If you do not know some authorities beforehand, + yet still want MSAL to accept any authority that you will provide, + you can use a ``False`` to unconditionally disable Instance Discovery. + + New in version 1.19.0. """ self.client_id = client_id self.client_credential = client_credential self.client_claims = client_claims self._client_capabilities = client_capabilities + self._instance_discovery = instance_discovery if exclude_scopes and not isinstance(exclude_scopes, list): raise ValueError( @@ -453,9 +482,13 @@ def __init__( # Here the self.authority will not be the same type as authority in input try: + authority_to_use = authority or "https://{}/common/".format(WORLD_WIDE) self.authority = Authority( - authority or "https://login.microsoftonline.com/common/", - self.http_client, validate_authority=validate_authority) + authority_to_use, + self.http_client, + validate_authority=validate_authority, + instance_discovery=self._instance_discovery, + ) except ValueError: # Those are explicit authority validation errors raise except Exception: # The rest are typically connection errors @@ -463,8 +496,10 @@ def __init__( # Since caller opts in to use region, here we tolerate connection # errors happened during authority validation at non-region endpoint self.authority = Authority( - authority or "https://login.microsoftonline.com/common/", - self.http_client, validate_authority=False) + authority_to_use, + self.http_client, + instance_discovery=False, + ) else: raise @@ -534,10 +569,11 @@ def _get_regional_authority(self, central_authority): "sts.windows.net", ) else "{}.{}".format(region_to_use, central_authority.instance)) - return Authority( + return Authority( # The central_authority has already been validated "https://{}/{}".format(regional_host, central_authority.tenant), self.http_client, - validate_authority=False) # The central_authority has already been validated + instance_discovery=False, + ) return None def _build_client(self, client_credential, authority, skip_regional_client=False): @@ -789,7 +825,8 @@ def get_authorization_request_url( # Multi-tenant app can use new authority on demand the_authority = Authority( authority, - self.http_client + self.http_client, + instance_discovery=self._instance_discovery, ) if authority else self.authority client = _ClientWithCcsRoutingInfo( @@ -1012,14 +1049,23 @@ def _find_msal_accounts(self, environment): } return list(grouped_accounts.values()) + def _get_instance_metadata(self): # This exists so it can be mocked in unit test + resp = self.http_client.get( + "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", # TBD: We may extend this to use self._instance_discovery endpoint + headers={'Accept': 'application/json'}) + resp.raise_for_status() + return json.loads(resp.text)['metadata'] + def _get_authority_aliases(self, instance): + if self._instance_discovery is False: + return [] + if self.authority._is_known_to_developer: + # Then it is an ADFS/B2C/known_authority_hosts situation + # which may not reach the central endpoint, so we skip it. + return [] if not self.authority_groups: - resp = self.http_client.get( - "https://login.microsoftonline.com/common/discovery/instance?api-version=1.1&authorization_endpoint=https://login.microsoftonline.com/common/oauth2/authorize", - headers={'Accept': 'application/json'}) - resp.raise_for_status() self.authority_groups = [ - set(group['aliases']) for group in json.loads(resp.text)['metadata']] + set(group['aliases']) for group in self._get_instance_metadata()] for group in self.authority_groups: if instance in group: return [alias for alias in group if alias != instance] @@ -1168,6 +1214,7 @@ def acquire_token_silent_with_error( # the_authority = Authority( # authority, # self.http_client, + # instance_discovery=self._instance_discovery, # ) if authority else self.authority result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( scopes, account, self.authority, force_refresh=force_refresh, @@ -1189,7 +1236,8 @@ def acquire_token_silent_with_error( the_authority = Authority( "https://" + alias + "/" + self.authority.tenant, self.http_client, - validate_authority=False) + instance_discovery=False, + ) result = self._acquire_token_silent_from_cache_and_possibly_refresh_it( scopes, account, the_authority, force_refresh=force_refresh, claims_challenge=claims_challenge, diff --git a/msal/authority.py b/msal/authority.py index 81788200..216fed5d 100644 --- a/msal/authority.py +++ b/msal/authority.py @@ -58,7 +58,11 @@ def http_client(self): # Obsolete. We will remove this eventually "authority.http_client might be removed in MSAL Python 1.21+", DeprecationWarning) return self._http_client - def __init__(self, authority_url, http_client, validate_authority=True): + def __init__( + self, authority_url, http_client, + validate_authority=True, + instance_discovery=None, + ): """Creates an authority instance, and also validates it. :param validate_authority: @@ -67,19 +71,34 @@ def __init__(self, authority_url, http_client, validate_authority=True): This parameter only controls whether an instance discovery will be performed. """ + # :param instance_discovery: + # By default, the known-to-Microsoft validation will use an + # instance discovery endpoint located at ``login.microsoftonline.com``. + # You can customize the endpoint by providing a url as a string. + # Or you can turn this behavior off by passing in a False here. self._http_client = http_client if isinstance(authority_url, AuthorityBuilder): authority_url = str(authority_url) authority, self.instance, tenant = canonicalize(authority_url) + self.is_adfs = tenant.lower() == 'adfs' parts = authority.path.split('/') - is_b2c = any(self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS) or ( - len(parts) == 3 and parts[2].lower().startswith("b2c_")) - if (tenant != "adfs" and (not is_b2c) and validate_authority - and self.instance not in WELL_KNOWN_AUTHORITY_HOSTS): - payload = instance_discovery( + is_b2c = any( + self.instance.endswith("." + d) for d in WELL_KNOWN_B2C_HOSTS + ) or (len(parts) == 3 and parts[2].lower().startswith("b2c_")) + self._is_known_to_developer = self.is_adfs or is_b2c or not validate_authority + is_known_to_microsoft = self.instance in WELL_KNOWN_AUTHORITY_HOSTS + instance_discovery_endpoint = 'https://{}/common/discovery/instance'.format( # Note: This URL seemingly returns V1 endpoint only + WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too + # See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103 + # and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33 + ) if instance_discovery in (None, True) else instance_discovery + if instance_discovery_endpoint and not ( + is_known_to_microsoft or self._is_known_to_developer): + payload = _instance_discovery( "https://{}{}/oauth2/v2.0/authorize".format( self.instance, authority.path), - self._http_client) + self._http_client, + instance_discovery_endpoint) if payload.get("error") == "invalid_instance": raise ValueError( "invalid_instance: " @@ -113,7 +132,6 @@ def __init__(self, authority_url, http_client, validate_authority=True): self.token_endpoint = openid_config['token_endpoint'] self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint') _, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID - self.is_adfs = self.tenant.lower() == 'adfs' def user_realm_discovery(self, username, correlation_id=None, response=None): # It will typically return a dict containing "ver", "account_type", @@ -145,13 +163,9 @@ def canonicalize(authority_url): % authority_url) return authority, authority.hostname, parts[1] -def instance_discovery(url, http_client, **kwargs): - resp = http_client.get( # Note: This URL seemingly returns V1 endpoint only - 'https://{}/common/discovery/instance'.format( - WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too - # See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103 - # and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33 - ), +def _instance_discovery(url, http_client, instance_discovery_endpoint, **kwargs): + resp = http_client.get( + instance_discovery_endpoint, params={'authorization_endpoint': url, 'api-version': '1.0'}, **kwargs) return json.loads(resp.text) diff --git a/tests/test_authority.py b/tests/test_authority.py index fc6e12fc..dd91afbb 100644 --- a/tests/test_authority.py +++ b/tests/test_authority.py @@ -1,5 +1,10 @@ import os +try: + from unittest.mock import patch +except: + from mock import patch +import msal from msal.authority import * from tests import unittest from tests.http_client import MinimalHttpClient @@ -123,3 +128,64 @@ class MockResponse(object): finally: # MUST NOT let the previous test changes affect other test cases Authority._domains_without_user_realm_discovery = set([]) + +@patch("msal.authority.tenant_discovery", return_value={ + "authorization_endpoint": "https://contoso.com/placeholder", + "token_endpoint": "https://contoso.com/placeholder", + }) +@patch("msal.authority._instance_discovery") +@patch.object(msal.ClientApplication, "_get_instance_metadata", return_value=[]) +class TestMsalBehaviorsWithoutAndWithInstanceDiscoveryBoolean(unittest.TestCase): + """Test cases use ClientApplication, which is a base class of both PCA and CCA""" + + def test_by_default_a_known_to_microsoft_authority_should_skip_validation_but_still_use_instance_metadata( + self, instance_metadata, known_to_microsoft_validation, _): + app = msal.ClientApplication("id", authority="https://login.microsoftonline.com/common") + known_to_microsoft_validation.assert_not_called() + app.get_accounts() # This could make an instance metadata call for authority aliases + instance_metadata.assert_called_once_with() + + def test_validate_authority_boolean_should_skip_validation_and_instance_metadata( + self, instance_metadata, known_to_microsoft_validation, _): + """Pending deprecation, but kept for backward compatibility, for now""" + app = msal.ClientApplication( + "id", authority="https://contoso.com/common", validate_authority=False) + known_to_microsoft_validation.assert_not_called() + app.get_accounts() # This could make an instance metadata call for authority aliases + instance_metadata.assert_not_called() + + def test_by_default_adfs_should_skip_validation_and_instance_metadata( + self, instance_metadata, known_to_microsoft_validation, _): + """Not strictly required, but when/if we already supported it, we better keep it""" + app = msal.ClientApplication("id", authority="https://contoso.com/adfs") + known_to_microsoft_validation.assert_not_called() + app.get_accounts() # This could make an instance metadata call for authority aliases + instance_metadata.assert_not_called() + + def test_by_default_b2c_should_skip_validation_and_instance_metadata( + self, instance_metadata, known_to_microsoft_validation, _): + """Not strictly required, but when/if we already supported it, we better keep it""" + app = msal.ClientApplication( + "id", authority="https://login.b2clogin.com/contoso/b2c_policy") + known_to_microsoft_validation.assert_not_called() + app.get_accounts() # This could make an instance metadata call for authority aliases + instance_metadata.assert_not_called() + + def test_turning_off_instance_discovery_should_work_for_all_kinds_of_clouds( + self, instance_metadata, known_to_microsoft_validation, _): + for authority in [ + "https://login.microsoftonline.com/common", # Known to Microsoft + "https://contoso.com/adfs", # ADFS + "https://login.b2clogin.com/contoso/b2c_policy", # B2C + "https://private.cloud/foo", # Private Cloud + ]: + self._test_turning_off_instance_discovery_should_skip_authority_validation_and_instance_metadata( + authority, instance_metadata, known_to_microsoft_validation) + + def _test_turning_off_instance_discovery_should_skip_authority_validation_and_instance_metadata( + self, authority, instance_metadata, known_to_microsoft_validation): + app = msal.ClientApplication("id", authority=authority, instance_discovery=False) + known_to_microsoft_validation.assert_not_called() + app.get_accounts() # This could make an instance metadata call for authority aliases + instance_metadata.assert_not_called() +