Skip to content

Commit 670b78c

Browse files
authored
CertificateCredential supports PKCS12 certs (#16384)
1 parent f4ce7eb commit 670b78c

19 files changed

+217
-187
lines changed

sdk/identity/azure-identity/CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## 1.7.0b4 (Unreleased)
44

55
### Features Added
6+
- `CertificateCredential` accepts certificates in PKCS12 format
7+
([#13540](https://github.com/Azure/azure-sdk-for-python/issues/13540))
68

79
### Breaking Changes
810

@@ -15,6 +17,16 @@
1517
([#18798](https://github.com/Azure/azure-sdk-for-python/issues/18798))
1618

1719

20+
21+
## 1.6.1 (2021-08-19)
22+
23+
### Other Changes
24+
- Persistent cache implementations are now loaded on demand, enabling
25+
workarounds when importing transitive dependencies such as pywin32
26+
fails
27+
([#19989](https://github.com/Azure/azure-sdk-for-python/issues/19989))
28+
29+
1830
## 1.7.0b3 (2021-08-10)
1931

2032
### Breaking Changes

sdk/identity/azure-identity/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ variables:
272272
|-|-
273273
|`AZURE_CLIENT_ID`|id of an Azure Active Directory application
274274
|`AZURE_TENANT_ID`|id of the application's Azure Active Directory tenant
275-
|`AZURE_CLIENT_CERTIFICATE_PATH`|path to a PEM-encoded certificate file including private key (without password protection)
275+
|`AZURE_CLIENT_CERTIFICATE_PATH`|path to a PEM or PKCS12 certificate file including private key (without password protection)
276276

277277
#### Username and password
278278
|variable name|value

sdk/identity/azure-identity/azure/identity/_credentials/certificate.py

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Licensed under the MIT License.
44
# ------------------------------------
55
from binascii import hexlify
6-
from typing import TYPE_CHECKING
6+
from typing import cast, NamedTuple, TYPE_CHECKING
77

88
from cryptography import x509
99
from cryptography.hazmat.primitives import hashes, serialization
@@ -15,6 +15,7 @@
1515
from .._internal.client_credential_base import ClientCredentialBase
1616

1717
if TYPE_CHECKING:
18+
# pylint:disable=ungrouped-imports
1819
from typing import Any, Optional, Union
1920

2021

@@ -28,13 +29,13 @@ class CertificateCredential(ClientCredentialBase):
2829
2930
:param str tenant_id: ID of the service principal's tenant. Also called its "directory" ID.
3031
:param str client_id: the service principal's client ID
31-
:param str certificate_path: path to a PEM-encoded certificate file including the private key. If not provided,
32-
**certificate_data** is required.
32+
:param str certificate_path: Optional path to a certificate file in PEM or PKCS12 format, including the private
33+
key. If not provided, **certificate_data** is required.
3334
34-
:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
35+
:keyword str authority: Authority of an Azure Active Directory endpoint, for example "login.microsoftonline.com",
3536
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
3637
defines authorities for other clouds.
37-
:keyword bytes certificate_data: the bytes of a certificate in PEM format, including the private key
38+
:keyword bytes certificate_data: the bytes of a certificate in PEM or PKCS12 format, including the private key
3839
:keyword password: The certificate's password. If a unicode string, it will be encoded as UTF-8. If the certificate
3940
requires a different encoding, pass appropriately encoded bytes instead.
4041
:paramtype password: str or bytes
@@ -76,6 +77,42 @@ def extract_cert_chain(pem_bytes):
7677
return b"".join(chain.splitlines())
7778

7879

80+
_Cert = NamedTuple("_Cert", [("pem_bytes", bytes), ("private_key", "Any"), ("fingerprint", bytes)])
81+
82+
83+
def load_pem_certificate(certificate_data, password):
84+
# type: (bytes, Optional[bytes]) -> _Cert
85+
private_key = serialization.load_pem_private_key(certificate_data, password, backend=default_backend())
86+
cert = x509.load_pem_x509_certificate(certificate_data, default_backend())
87+
fingerprint = cert.fingerprint(hashes.SHA1()) # nosec
88+
return _Cert(certificate_data, private_key, fingerprint)
89+
90+
91+
def load_pkcs12_certificate(certificate_data, password):
92+
# type: (bytes, Optional[bytes]) -> _Cert
93+
from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, pkcs12, PrivateFormat
94+
95+
private_key, cert, additional_certs = pkcs12.load_key_and_certificates(
96+
certificate_data, password, backend=default_backend()
97+
)
98+
if not private_key:
99+
raise ValueError("The certificate must include its private key")
100+
if not cert:
101+
# mentioning PEM here because we raise this error when certificate_data is garbage
102+
raise ValueError("Failed to deserialize certificate in PEM or PKCS12 format")
103+
104+
# This serializes the private key without any encryption it may have had. Doing so doesn't violate security
105+
# boundaries because this representation of the key is kept in memory. We already have the key and its
106+
# password, if any, in memory.
107+
key_bytes = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
108+
pem_sections = [key_bytes] + [c.public_bytes(Encoding.PEM) for c in [cert] + additional_certs]
109+
pem_bytes = b"".join(pem_sections)
110+
111+
fingerprint = cert.fingerprint(hashes.SHA1()) # nosec
112+
113+
return _Cert(pem_bytes, private_key, fingerprint)
114+
115+
79116
def get_client_credential(certificate_path, password=None, certificate_data=None, send_certificate_chain=False, **_):
80117
# type: (Optional[str], Optional[Union[bytes, str]], Optional[bytes], bool, **Any) -> dict
81118
"""Load a certificate from a filesystem path or bytes, return it as a dict suitable for msal.ClientApplication"""
@@ -88,24 +125,28 @@ def get_client_credential(certificate_path, password=None, certificate_data=None
88125
elif not certificate_data:
89126
raise ValueError('CertificateCredential requires a value for either "certificate_path" or "certificate_data"')
90127

91-
if isinstance(password, six.text_type):
92-
password = password.encode(encoding="utf-8")
128+
if password:
129+
# if password is already bytes, this won't change its encoding
130+
password = six.ensure_binary(password, "utf-8")
131+
password = cast("Optional[bytes]", password)
93132

94-
private_key = serialization.load_pem_private_key(certificate_data, password=password, backend=default_backend())
95-
if not isinstance(private_key, RSAPrivateKey):
96-
raise ValueError("CertificateCredential requires an RSA private key because it uses RS256 for signing")
133+
if certificate_data.startswith(b"-----"):
134+
cert = load_pem_certificate(certificate_data, password)
135+
else:
136+
cert = load_pkcs12_certificate(certificate_data, password)
137+
password = None # load_pkcs12_certificate returns cert.pem_bytes decrypted
97138

98-
cert = x509.load_pem_x509_certificate(certificate_data, default_backend())
99-
fingerprint = cert.fingerprint(hashes.SHA1()) # nosec
139+
if not isinstance(cert.private_key, RSAPrivateKey):
140+
raise ValueError("CertificateCredential requires an RSA private key because it uses RS256 for signing")
100141

101-
client_credential = {"private_key": certificate_data, "thumbprint": hexlify(fingerprint).decode("utf-8")}
142+
client_credential = {"private_key": cert.pem_bytes, "thumbprint": hexlify(cert.fingerprint).decode("utf-8")}
102143
if password:
103144
client_credential["passphrase"] = password
104145

105146
if send_certificate_chain:
106147
try:
107148
# the JWT needs the whole chain but load_pem_x509_certificate deserializes only the signing cert
108-
chain = extract_cert_chain(certificate_data)
149+
chain = extract_cert_chain(cert.pem_bytes)
109150
client_credential["public_certificate"] = six.ensure_str(chain)
110151
except ValueError as ex:
111152
# we shouldn't land here--cryptography already loaded the cert and would have raised if it were malformed

sdk/identity/azure-identity/azure/identity/_credentials/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class EnvironmentCredential(object):
4242
Service principal with certificate:
4343
- **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its 'directory' ID.
4444
- **AZURE_CLIENT_ID**: the service principal's client ID
45-
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM-encoded certificate file including the private key. The
45+
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM or PKCS12 certificate file including the private key. The
4646
certificate must not be password-protected.
4747
4848
User with username and password:

sdk/identity/azure-identity/azure/identity/aio/_credentials/environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class EnvironmentCredential(AsyncContextManager):
3535
Service principal with certificate:
3636
- **AZURE_TENANT_ID**: ID of the service principal's tenant. Also called its 'directory' ID.
3737
- **AZURE_CLIENT_ID**: the service principal's client ID
38-
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM-encoded certificate file including the private key. The
38+
- **AZURE_CLIENT_CERTIFICATE_PATH**: path to a PEM or PKCS12 certificate file including the private key. The
3939
certificate must not be password-protected.
4040
4141
:keyword bool allow_multitenant_authentication: when True, enables the credential to acquire tokens from any tenant

sdk/identity/azure-identity/conftest.py

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def record_imds_test(request):
5555

5656

5757
@pytest.fixture()
58-
def live_service_principal(): # pylint:disable=inconsistent-return-statements
58+
def live_service_principal():
5959
"""Fixture for live Identity tests. Skips them when environment configuration is incomplete."""
6060

6161
missing_variables = [
@@ -77,33 +77,58 @@ def live_service_principal(): # pylint:disable=inconsistent-return-statements
7777
}
7878

7979

80+
def get_certificate_parameters(content, password_protected_content, password, extension):
81+
# type: (bytes, bytes, str, str) -> dict
82+
current_directory = os.path.dirname(__file__)
83+
parameters = {
84+
"cert_bytes": six.ensure_binary(content),
85+
"cert_path": os.path.join(current_directory, "certificate." + extension),
86+
"cert_with_password_bytes": six.ensure_binary(password_protected_content),
87+
"cert_with_password_path": os.path.join(current_directory, "certificate-with-password." + extension),
88+
"password": password,
89+
}
90+
91+
try:
92+
with open(parameters["cert_path"], "wb") as f:
93+
f.write(parameters["cert_bytes"])
94+
with open(parameters["cert_with_password_path"], "wb") as f:
95+
f.write(parameters["cert_with_password_bytes"])
96+
except IOError as ex:
97+
pytest.skip("Failed to write a file: {}".format(ex))
98+
99+
return parameters
100+
101+
80102
@pytest.fixture()
81-
def live_certificate(live_service_principal):
103+
def live_pem_certificate(live_service_principal):
82104
content = os.environ.get("PEM_CONTENT")
83105
password_protected_content = os.environ.get("PEM_CONTENT_PASSWORD_PROTECTED")
84106
password = os.environ.get("CERTIFICATE_PASSWORD")
85107

86108
if content and password_protected_content and password:
87-
current_directory = os.path.dirname(__file__)
88-
parameters = {
89-
"cert_bytes": six.ensure_binary(content),
90-
"cert_path": os.path.join(current_directory, "certificate.pem"),
91-
"cert_with_password_bytes": six.ensure_binary(password_protected_content),
92-
"cert_with_password_path": os.path.join(current_directory, "certificate-with-password.pem"),
93-
"password": password,
94-
}
109+
parameters = get_certificate_parameters(content, password_protected_content, password, "pem")
110+
return dict(live_service_principal, **parameters)
95111

96-
try:
97-
with open(parameters["cert_path"], "wb") as f:
98-
f.write(parameters["cert_bytes"])
99-
with open(parameters["cert_with_password_path"], "wb") as f:
100-
f.write(parameters["cert_with_password_bytes"])
101-
except IOError as ex:
102-
pytest.skip("Failed to write a file: {}".format(ex))
112+
pytest.skip("Missing PEM certificate configuration")
113+
114+
115+
@pytest.fixture()
116+
def live_pfx_certificate(live_service_principal):
117+
# PFX bytes arrive base64 encoded because Key Vault secrets have string values
118+
encoded_content = os.environ.get("PFX_CONTENT")
119+
encoded_password_protected_content = os.environ.get("PFX_CONTENT_PASSWORD_PROTECTED")
120+
password = os.environ.get("CERTIFICATE_PASSWORD")
121+
122+
if encoded_content and encoded_password_protected_content and password:
123+
import base64
103124

125+
content = base64.b64decode(six.ensure_binary(encoded_content))
126+
password_protected_content = base64.b64decode(six.ensure_binary(encoded_password_protected_content))
127+
128+
parameters = get_certificate_parameters(content, password_protected_content, password, "pfx")
104129
return dict(live_service_principal, **parameters)
105130

106-
pytest.skip("Missing PEM certificate configuration")
131+
pytest.skip("Missing PFX certificate configuration")
107132

108133

109134
@pytest.fixture()

sdk/identity/azure-identity/samples/key_vault_cert.py

Lines changed: 18 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,7 @@
1313
import os
1414

1515
from azure.identity import CertificateCredential, DefaultAzureCredential
16-
from azure.keyvault.certificates import (
17-
CertificateClient,
18-
CertificateContentType,
19-
CertificatePolicy,
20-
WellKnownIssuerNames,
21-
)
16+
from azure.keyvault.certificates import CertificateClient, CertificateContentType, CertificatePolicy
2217
from azure.keyvault.secrets import SecretClient
2318

2419
VAULT_URL = os.environ["VAULT_URL"]
@@ -31,79 +26,24 @@
3126
# Key Vault stores certificate private keys as secrets, so we use a SecretClient to retrieve them
3227
SECRET_CLIENT = SecretClient(VAULT_URL, credential)
3328

29+
# Creating a self-signed cert to work with
30+
create_cert_poller = CERT_CLIENT.begin_create_certificate("azure-identity-sample", CertificatePolicy.get_default())
31+
cert = create_cert_poller.result()
3432

35-
def pkcs12_cert():
36-
"""Demonstrates creating a CertificateCredential with a Key Vault certificate stored in PKCS12 (default) format"""
33+
# The certificate as returned by begin_create_certificate() or get_certificate() contains
34+
# only the public portion of the certificate. Key Vault will release the private key only
35+
# if the certificate's policy indicates it's exportable (certs are exportable by default).
36+
policy = CERT_CLIENT.get_certificate_policy(cert.name)
37+
assert policy.exportable, "Expected an exportable certificate because that's Key Vault's default"
3738

38-
# Creating a self-signed cert to work with
39-
create_cert_poller = CERT_CLIENT.begin_create_certificate(
40-
"azure-identity-sample-default", CertificatePolicy.get_default()
41-
)
42-
cert = create_cert_poller.result()
39+
# The policy's content_type indicates whether the certificate is stored in PEM or PKCS12 format
40+
assert policy.content_type == CertificateContentType.pkcs12, "Expected PKCS12 because that's Key Vault's default"
4341

44-
# CertificateCredential requires the certificate and its private key in PEM format.
45-
# The certificate as returned by begin_create_certificate() or get_certificate() contains
46-
# only the public portion of the certificate. Key Vault will release the private key only
47-
# if the certificate's policy indicates it's exportable (certs are exportable by default).
48-
policy = CERT_CLIENT.get_certificate_policy(cert.name)
49-
assert policy.exportable, "Expected an exportable certificate because that's Key Vault's default"
42+
# Key Vault stores the complete certificate, with its private key, as a secret sharing the certificate's name
43+
# Because this certificate is stored in PKCS12 format, the secret's value is base64 encoded bytes
44+
encoded_cert = SECRET_CLIENT.get_secret(cert.name).value
45+
pkcs12_bytes = base64.b64decode(encoded_cert)
5046

51-
# The policy's content_type indicates whether the certificate is stored in PEM or PKCS12 format
52-
assert policy.content_type == CertificateContentType.pkcs12, "Expected PKCS12 because that's Key Vault's default"
53-
54-
# Key Vault stores the complete certificate, with its private key, as a secret sharing the certificate's name
55-
# Because this certificate is stored in PKCS12 format, the secret's value is base64 encoded bytes
56-
encoded_cert = SECRET_CLIENT.get_secret(cert.name).value
57-
pkcs12_bytes = base64.b64decode(encoded_cert)
58-
59-
# cryptography can convert PKCS12 to PEM
60-
def pkcs12_to_pem(pkcs12_bytes):
61-
"""Convert certificate bytes from PKCS12 format to PEM using the "cryptography" library"""
62-
from cryptography.hazmat.backends import default_backend
63-
from cryptography.hazmat.primitives.serialization import Encoding, pkcs12, PrivateFormat, NoEncryption
64-
65-
private_key, cert, additional_certs = pkcs12.load_key_and_certificates(
66-
pkcs12_bytes, password=None, backend=default_backend()
67-
)
68-
69-
# using NoEncryption because the certificate created above is not password protected
70-
private_bytes = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
71-
pem_sections = [private_bytes] + [c.public_bytes(Encoding.PEM) for c in [cert] + additional_certs]
72-
return b"".join(pem_sections)
73-
74-
pem_bytes = pkcs12_to_pem(pkcs12_bytes)
75-
76-
# This credential will load the certificate but can't actually authenticate. Authentication requires real
77-
# tenant and client IDs for a service principal configured to accept the certificate.
78-
CertificateCredential("tenant-id", "client-id", certificate_data=pem_bytes)
79-
80-
81-
def pem_cert():
82-
"""Demonstrates creating a CertificateCredential with a Key Vault certificate stored in PEM format"""
83-
84-
# creating a self-signed certificate stored in PEM format (PKCS12 is Key Vault's default format)
85-
pem_policy = CertificatePolicy(
86-
WellKnownIssuerNames.self, subject="CN=localhost", content_type=CertificateContentType.pem
87-
)
88-
pem_cert = CERT_CLIENT.begin_create_certificate("azure-identity-sample-pem", pem_policy).result()
89-
90-
# verifying the certificate is exportable and stored in PEM format, to
91-
# demonstrate how you would do so when you don't already have its policy
92-
policy = CERT_CLIENT.get_certificate_policy(pem_cert.name)
93-
assert policy.exportable, "Expected an exportable certificate because that's Key Vault's default"
94-
assert policy.content_type == CertificateContentType.pem
95-
96-
# Because the certificate is exportable, it's available (with its private key) as a secret
97-
pem_cert_secret = SECRET_CLIENT.get_secret(pem_cert.name)
98-
99-
# The secret's value is a string; CertificateCredential requires bytes
100-
pem_bytes = pem_cert_secret.value.encode()
101-
102-
# This credential will load the certificate but can't actually authenticate. Authentication requires real
103-
# tenant and client IDs for a service principal configured to accept the certificate.
104-
CertificateCredential("tenant-id", "client-id", certificate_data=pem_bytes)
105-
106-
107-
if __name__ == "__main__":
108-
pkcs12_cert()
109-
pem_cert()
47+
# This credential will load the certificate but can't actually authenticate. Authentication requires real
48+
# tenant and client IDs for a service principal configured to accept the certificate.
49+
CertificateCredential("tenant-id", "client-id", certificate_data=pkcs12_bytes)

sdk/identity/azure-identity/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
),
7474
install_requires=[
7575
"azure-core<2.0.0,>=1.11.0",
76-
"cryptography>=2.1.4",
76+
"cryptography>=2.5",
7777
"msal<2.0.0,>=1.12.0",
7878
"msal-extensions~=0.3.0",
7979
"six>=1.12.0",
2.43 KB
Binary file not shown.
3.87 KB
Binary file not shown.

0 commit comments

Comments
 (0)