Skip to content

Commit 33f1157

Browse files
Ray LuoRay Luo
authored andcommitted
PoC: Managed Identity for Azure VM
1 parent c3af17e commit 33f1157

File tree

3 files changed

+88
-9
lines changed

3 files changed

+88
-9
lines changed

msal/application.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1702,6 +1702,21 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
17021702
- an error response would contain "error" and usually "error_description".
17031703
"""
17041704
# TBD: force_refresh behavior
1705+
if self.client_credential is None:
1706+
from .imds import _scope_to_resource, _obtain_token
1707+
response = _obtain_token(
1708+
self.http_client,
1709+
" ".join(map(_scope_to_resource, scopes)),
1710+
client_id=self.client_id, # None for system-assigned, GUID for user-assigned
1711+
)
1712+
if "error" not in response:
1713+
self.token_cache.add(dict(
1714+
client_id=self.client_id,
1715+
scope=response["scope"].split() if "scope" in response else scopes,
1716+
token_endpoint=self.authority.token_endpoint,
1717+
response=response.copy(),
1718+
))
1719+
return response
17051720
if self.authority.tenant.lower() in ["common", "organizations"]:
17061721
warnings.warn(
17071722
"Using /common or /organizations authority "

msal/imds.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# All rights reserved.
3+
#
4+
# This code is licensed under the MIT License.
5+
import json
6+
import logging
7+
try: # Python 2
8+
from urlparse import urlparse
9+
except: # Python 3
10+
from urllib.parse import urlparse
11+
12+
logger = logging.getLogger(__name__)
13+
14+
def _scope_to_resource(scope): # This is an experimental reasonable-effort approach
15+
u = urlparse(scope)
16+
if u.scheme:
17+
return "{}://{}".format(u.scheme, u.netloc)
18+
return scope # There is no much else we can do here
19+
20+
def _obtain_token(http_client, resource, client_id=None):
21+
# Based on https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#get-a-token-using-http
22+
params = {
23+
"api-version": "2018-02-01",
24+
"resource": resource,
25+
}
26+
if client_id:
27+
params["client_id"] = client_id
28+
resp = http_client.get(
29+
"http://169.254.169.254/metadata/identity/oauth2/token",
30+
params=params,
31+
headers={"Metadata": "true"},
32+
)
33+
try:
34+
payload = json.loads(resp.text)
35+
if payload.get("access_token") and payload.get("expires_in"):
36+
return { # Normalizing the payload into OAuth2 format
37+
"access_token": payload["access_token"],
38+
"expires_in": int(payload["expires_in"]),
39+
"resource": payload.get("resource"),
40+
"token_type": payload.get("token_type", "Bearer"),
41+
}
42+
return payload # Typically an error
43+
except ValueError:
44+
logger.debug("IMDS emits unexpected payload: %s", resp.text)
45+
raise
46+

tests/msaltest.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ def _input_scopes():
4141
accept_nonempty_string=True,
4242
).split()
4343

44-
def _select_account(app):
44+
def _select_account(app, show_confidential_app_placeholder=False):
4545
accounts = app.get_accounts()
46+
if show_confidential_app_placeholder and isinstance(
47+
app, msal.ConfidentialClientApplication):
48+
accounts.insert(0, {"username": "This Client"})
4649
if accounts:
4750
return _select_options(
4851
accounts,
@@ -54,11 +57,11 @@ def _select_account(app):
5457

5558
def acquire_token_silent(app):
5659
"""acquire_token_silent() - with an account already signed into MSAL Python."""
57-
account = _select_account(app)
60+
account = _select_account(app, show_confidential_app_placeholder=True)
5861
if account:
5962
pprint.pprint(app.acquire_token_silent(
6063
_input_scopes(),
61-
account=account,
64+
account=account if "home_account_id" in account else None,
6265
force_refresh=_input_boolean("Bypass MSAL Python's token cache?"),
6366
))
6467

@@ -127,6 +130,10 @@ def remove_account(app):
127130
app.remove_account(account)
128131
print('Account "{}" and/or its token(s) are signed out from MSAL Python'.format(account["username"]))
129132

133+
def acquire_token_for_client(app):
134+
"""acquire_token_for_client() - Only for confidential client"""
135+
pprint.pprint(app.acquire_token_for_client(_input_scopes()))
136+
130137
def exit(_):
131138
"""Exit"""
132139
bug_link = "https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/new/choose"
@@ -139,13 +146,12 @@ def main():
139146
{"client_id": "04b07795-8ddb-461a-bbee-02f9e1bf7b46", "name": "Azure CLI (Correctly configured for MSA-PT)"},
140147
{"client_id": "04f0c124-f2bc-4f59-8241-bf6df9866bbd", "name": "Visual Studio (Correctly configured for MSA-PT)"},
141148
{"client_id": "95de633a-083e-42f5-b444-a4295d8e9314", "name": "Whiteboard Services (Non MSA-PT app. Accepts AAD & MSA accounts.)"},
149+
{"client_id": None, "client_secret": None, "name": "System-assigned Managed Identity (Only works when running inside a supported environment, such as Azure VM)"},
142150
],
143151
option_renderer=lambda a: a["name"],
144152
header="Impersonate this app (or you can type in the client_id of your own app)",
145153
accept_nonempty_string=True)
146-
app = msal.PublicClientApplication(
147-
chosen_app["client_id"] if isinstance(chosen_app, dict) else chosen_app,
148-
authority=_select_options([
154+
authority = _select_options([
149155
"https://login.microsoftonline.com/common",
150156
"https://login.microsoftonline.com/organizations",
151157
"https://login.microsoftonline.com/microsoft.onmicrosoft.com",
@@ -154,20 +160,32 @@ def main():
154160
],
155161
header="Input authority (Note that MSA-PT apps would NOT use the /common authority)",
156162
accept_nonempty_string=True,
157-
),
158163
)
164+
if isinstance(chosen_app, dict) and "client_secret" in chosen_app:
165+
app = msal.ConfidentialClientApplication(
166+
chosen_app["client_id"],
167+
client_credential=chosen_app["client_secret"],
168+
authority=authority,
169+
)
170+
else:
171+
app = msal.PublicClientApplication(
172+
chosen_app["client_id"] if isinstance(chosen_app, dict) else chosen_app,
173+
authority=authority,
174+
)
159175
if _input_boolean("Enable MSAL Python's DEBUG log?"):
160176
logging.basicConfig(level=logging.DEBUG)
161177
while True:
162-
func = _select_options([
178+
func = _select_options(list(filter(None, [
163179
acquire_token_silent,
164180
acquire_token_interactive,
165181
acquire_token_by_username_password,
166182
acquire_ssh_cert_silently,
167183
acquire_ssh_cert_interactive,
168184
remove_account,
185+
acquire_token_for_client if isinstance(
186+
app, msal.ConfidentialClientApplication) else None,
169187
exit,
170-
], option_renderer=lambda f: f.__doc__, header="MSAL Python APIs:")
188+
])), option_renderer=lambda f: f.__doc__, header="MSAL Python APIs:")
171189
try:
172190
func(app)
173191
except KeyboardInterrupt: # Useful for bailing out a stuck interactive flow

0 commit comments

Comments
 (0)