Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/identity/azure-identity/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
### Added
- `InteractiveBrowserCredential` uses PKCE internally to protect authorization
codes
- `CertificateCredential` can load a certificate from bytes instead of a file
path. To provide a certificate as bytes, use the keyword argument
`certificate_bytes` instead of `certificate_path`, for example:
`CertificateCredential(tenant_id, client_id, certificate_bytes=cert_bytes)`
([#14055](https://github.com/Azure/azure-sdk-for-python/issues/14055))

## 1.5.0 (2020-11-11)
### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
from .._internal.client_credential_base import ClientCredentialBase

if TYPE_CHECKING:
from typing import Any
from typing import Any, Optional, Union


class CertificateCredential(ClientCredentialBase):
"""Authenticates as a service principal using a certificate.

:param str tenant_id: ID of the service principal's tenant. Also called its 'directory' ID.
:param str client_id: the service principal's client ID
:param str certificate_path: path to a PEM-encoded certificate file including the private key.
:param str certificate_path: path to a PEM-encoded certificate file including the private key. If not provided,
`certificate_bytes` is required.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds.
:keyword bytes certificate_bytes: the bytes of a certificate in PEM format, including the private key
:keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate
requires a different encoding, pass appropriately encoded bytes instead.
:paramtype password: str or bytes
Expand All @@ -39,37 +41,11 @@ class CertificateCredential(ClientCredentialBase):
is unavailable. Default to False. Has no effect when `enable_persistent_cache` is False.
"""

def __init__(self, tenant_id, client_id, certificate_path, **kwargs):
# type: (str, str, str, **Any) -> None
def __init__(self, tenant_id, client_id, certificate_path=None, **kwargs):
# type: (str, str, Optional[str], **Any) -> None
validate_tenant_id(tenant_id)
if not certificate_path:
raise ValueError(
"'certificate_path' must be the path to a PEM file containing an x509 certificate and its private key"
)

password = kwargs.pop("password", None)
if isinstance(password, six.text_type):
password = password.encode(encoding="utf-8")

with open(certificate_path, "rb") as f:
pem_bytes = f.read()

cert = x509.load_pem_x509_certificate(pem_bytes, default_backend())
fingerprint = cert.fingerprint(hashes.SHA1()) # nosec

client_credential = {"private_key": pem_bytes, "thumbprint": hexlify(fingerprint).decode("utf-8")}
if password:
client_credential["passphrase"] = password

if kwargs.pop("send_certificate_chain", False):
try:
# the JWT needs the whole chain but load_pem_x509_certificate deserializes only the signing cert
chain = extract_cert_chain(pem_bytes)
client_credential["public_certificate"] = six.ensure_str(chain)
except ValueError as ex:
# we shouldn't land here, because load_pem_private_key should have raised when given a malformed file
message = 'Found no PEM encoded certificate in "{}"'.format(certificate_path)
six.raise_from(ValueError(message), ex)
client_credential = get_client_credential(certificate_path, **kwargs)

super(CertificateCredential, self).__init__(
client_id=client_id, client_credential=client_credential, tenant_id=tenant_id, **kwargs
Expand All @@ -84,6 +60,38 @@ def extract_cert_chain(pem_bytes):
start = pem_bytes.index(b"-----BEGIN CERTIFICATE-----")
footer = b"-----END CERTIFICATE-----"
end = pem_bytes.rindex(footer)
chain = pem_bytes[start:end + len(footer) + 1]
chain = pem_bytes[start : end + len(footer) + 1]

return b"".join(chain.splitlines())


def get_client_credential(certificate_path, password=None, certificate_bytes=None, send_certificate_chain=False, **_):
# type: (Optional[str], Optional[Union[bytes, str]], Optional[bytes], bool, **Any) -> dict
"""Load a certificate from a filesystem path or bytes, return it as a dict suitable for msal.ClientApplication"""

if certificate_path:
with open(certificate_path, "rb") as f:
certificate_bytes = f.read()
elif not certificate_bytes:
raise ValueError('This credential requires a value for "certificate_path" or "certificate_bytes"')

if isinstance(password, six.text_type):
password = password.encode(encoding="utf-8")

cert = x509.load_pem_x509_certificate(certificate_bytes, default_backend())
fingerprint = cert.fingerprint(hashes.SHA1()) # nosec

client_credential = {"private_key": certificate_bytes, "thumbprint": hexlify(fingerprint).decode("utf-8")}
if password:
client_credential["passphrase"] = password

if send_certificate_chain:
try:
# the JWT needs the whole chain but load_pem_x509_certificate deserializes only the signing cert
chain = extract_cert_chain(certificate_bytes)
client_credential["public_certificate"] = six.ensure_str(chain)
except ValueError as ex:
# we shouldn't land here--cryptography already loaded the cert and would have raised if it were malformed
six.raise_from(ValueError("Malformed certificate"), ex)

return client_credential
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def validate_tenant_id(tenant_id):
from .aad_client_base import AadClientBase
from .auth_code_redirect_handler import AuthCodeRedirectServer
from .aadclient_certificate import AadClientCertificate
from .certificate_credential_base import CertificateCredentialBase
from .client_secret_credential_base import ClientSecretCredentialBase
from .decorators import wrap_exceptions
from .interactive import InteractiveCredential
Expand All @@ -72,7 +71,6 @@ def _scopes_to_resource(*scopes):
"AadClientBase",
"AuthCodeRedirectServer",
"AadClientCertificate",
"CertificateCredentialBase",
"ClientSecretCredentialBase",
"get_default_authority",
"InteractiveCredential",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,56 @@
# ------------------------------------
from typing import TYPE_CHECKING

from msal import TokenCache

from .._internal import AadClient, AsyncContextManager
from .._internal.decorators import log_get_token_async
from ..._internal import CertificateCredentialBase
from ..._credentials.certificate import get_client_credential
from ..._internal import AadClientCertificate, validate_tenant_id
from ..._internal.persistent_cache import load_service_principal_cache

if TYPE_CHECKING:
from typing import Any
from typing import Any, Optional
from azure.core.credentials import AccessToken


class CertificateCredential(CertificateCredentialBase, AsyncContextManager):
class CertificateCredential(AsyncContextManager):
"""Authenticates as a service principal using a certificate.

:param str tenant_id: ID of the service principal's tenant. Also called its 'directory' ID.
:param str client_id: the service principal's client ID
:param str certificate_path: path to a PEM-encoded certificate file including the private key
:param str certificate_path: path to a PEM-encoded certificate file including the private key. If not provided,
`certificate_bytes` is required.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds.
:keyword bytes certificate_bytes: the bytes of a certificate in PEM format, including the private key
:keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate
requires a different encoding, pass appropriately encoded bytes instead.
:paramtype password: str or bytes
"""

def __init__(self, tenant_id, client_id, certificate_path=None, **kwargs):
# type: (str, str, Optional[str], **Any) -> None
validate_tenant_id(tenant_id)

client_credential = get_client_credential(certificate_path, **kwargs)

self._certificate = AadClientCertificate(
client_credential["private_key"], password=client_credential.get("passphrase")
)

enable_persistent_cache = kwargs.pop("enable_persistent_cache", False)
if enable_persistent_cache:
allow_unencrypted = kwargs.pop("allow_unencrypted_cache", False)
cache = load_service_principal_cache(allow_unencrypted)
else:
cache = TokenCache()

self._client = AadClient(tenant_id, client_id, cache=cache, **kwargs)
self._client_id = client_id

async def __aenter__(self):
await self._client.__aenter__()
return self
Expand Down Expand Up @@ -61,6 +87,3 @@ async def get_token(self, *scopes: str, **kwargs: "Any") -> "AccessToken": # py
except Exception: # pylint: disable=broad-except
pass
return token

def _get_auth_client(self, tenant_id, client_id, **kwargs):
return AadClient(tenant_id, client_id, **kwargs)
57 changes: 24 additions & 33 deletions sdk/identity/azure-identity/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys

import pytest
import six
from azure.identity._constants import DEVELOPER_SIGN_ON_CLIENT_ID, EnvironmentVariables


Expand Down Expand Up @@ -62,43 +63,32 @@ def live_service_principal(): # pylint:disable=inconsistent-return-statements


@pytest.fixture()
def live_certificate(live_service_principal): # pylint:disable=inconsistent-return-statements,redefined-outer-name
"""Provides a path to a PEM-encoded certificate with no password"""

pem_content = os.environ.get("PEM_CONTENT")
if not pem_content:
pytest.skip("Expected PEM content in environment variable 'PEM_CONTENT'")
return

pem_path = os.path.join(os.path.dirname(__file__), "certificate.pem")
try:
with open(pem_path, "w") as pem_file:
pem_file.write(pem_content)
return dict(live_service_principal, cert_path=pem_path)
except IOError as ex:
pytest.skip("Failed to write file '{}': {}".format(pem_path, ex))
def live_certificate(live_service_principal):
content = os.environ.get("PEM_CONTENT")
password_protected_content = os.environ.get("PEM_CONTENT_PASSWORD_PROTECTED")
password = os.environ.get("CERTIFICATE_PASSWORD")

if content and password_protected_content and password:
current_directory = os.path.dirname(__file__)
parameters = {
"cert_bytes": six.ensure_binary(content),
"cert_path": os.path.join(current_directory, "certificate.pem"),
"cert_with_password_bytes": six.ensure_binary(password_protected_content),
"cert_with_password_path": os.path.join(current_directory, "certificate-with-password.pem"),
"password": password,
}

@pytest.fixture()
def live_certificate_with_password(live_service_principal):
"""Provides a path to a PEM-encoded, password-protected certificate, and its password"""
try:
with open(parameters["cert_path"], "wb") as f:
f.write(parameters["cert_bytes"])
with open(parameters["cert_with_password_path"], "wb") as f:
f.write(parameters["cert_with_password_bytes"])
except IOError as ex:
pytest.skip("Failed to write a file: {}".format(ex))

pem_content = os.environ.get("PEM_CONTENT_PASSWORD_PROTECTED")
password = os.environ.get("CERTIFICATE_PASSWORD")
if not (pem_content and password):
pytest.skip(
"Expected password-protected PEM content in environment variable 'PEM_CONTENT_PASSWORD_PROTECTED'"
+ " and the password in 'CERTIFICATE_PASSWORD'"
)
return
return dict(live_service_principal, **parameters)

pem_path = os.path.join(os.path.dirname(__file__), "certificate-with-password.pem")
try:
with open(pem_path, "w") as pem_file:
pem_file.write(pem_content)
return dict(live_service_principal, cert_path=pem_path, password=password)
except IOError as ex:
pytest.skip("Failed to write file '{}': {}".format(pem_path, ex))
pytest.skip("Missing PEM certificate configuration")


@pytest.fixture()
Expand All @@ -114,6 +104,7 @@ def live_user_details():
else:
return user_details


@pytest.fixture()
def event_loop():
"""Ensure the event loop used by pytest-asyncio on Windows is ProactorEventLoop, which supports subprocesses.
Expand Down
31 changes: 31 additions & 0 deletions sdk/identity/azure-identity/tests/test_certificate_credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,21 @@ def test_authority(authority):
assert kwargs["authority"] == expected_authority


def test_requires_certificate():
"""the credential should raise ValueError when not given a certificate"""

with pytest.raises(ValueError):
CertificateCredential("tenant", "client-id")
with pytest.raises(ValueError):
CertificateCredential("tenant", "client-id", certificate_path=None)
with pytest.raises(ValueError):
CertificateCredential("tenant", "client-id", certificate_path="")
with pytest.raises(ValueError):
CertificateCredential("tenant", "client-id", certificate_bytes=None)
with pytest.raises(ValueError):
CertificateCredential("tenant", "client-id", certificate_path="", certificate_bytes=None)


@pytest.mark.parametrize("cert_path,cert_password", BOTH_CERTS)
@pytest.mark.parametrize("send_certificate_chain", (True, False))
def test_request_body(cert_path, cert_password, send_certificate_chain):
Expand Down Expand Up @@ -158,6 +173,22 @@ def mock_send(request, **kwargs):
token = cred.get_token(expected_scope)
assert token.token == access_token

# credential should also accept the certificate as bytes
with open(cert_path, "rb") as f:
cert_bytes = f.read()

cred = CertificateCredential(
tenant_id,
client_id,
certificate_bytes=cert_bytes,
password=cert_password,
transport=Mock(send=mock_send),
authority=authority,
send_certificate_chain=send_certificate_chain,
)
token = cred.get_token(expected_scope)
assert token.token == access_token


def validate_jwt(request, client_id, pem_bytes, expect_x5c=False):
"""Validate the request meets AAD's expectations for a client credential grant using a certificate, as documented
Expand Down
Loading