4
4
# --------------------------------------------------------------------------------------------
5
5
6
6
"""
7
- Credentials defined in this module are alternative implementations of credentials provided by Azure Identity.
8
-
9
- These credentials implement azure.core.credentials.TokenCredential by exposing `get_token` method for Track 2
10
- SDK invocation.
11
-
12
- If you want to implement your own credential, the credential must also expose `get_token` method.
13
-
14
- `get_token` method takes `scopes` as positional arguments and other optional `kwargs`, such as `claims`, `data`.
15
- The return value should be a named tuple containing two elements: token (str), expires_on (int). You may simply use
16
- azure.cli.core.auth.util.AccessToken to build the return value. See below credentials as examples.
7
+ Credentials to acquire tokens from MSAL.
17
8
"""
18
9
19
10
from knack .log import get_logger
22
13
ManagedIdentityClient , SystemAssignedManagedIdentity )
23
14
24
15
from .constants import AZURE_CLI_CLIENT_ID
25
- from .util import check_result , build_sdk_access_token
16
+ from .util import check_result
26
17
27
18
logger = get_logger (__name__ )
28
19
29
20
30
21
class UserCredential : # pylint: disable=too-few-public-methods
31
22
32
23
def __init__ (self , client_id , username , ** kwargs ):
33
- """User credential implementing get_token interface.
24
+ """User credential wrapping msal.application.PublicClientApplication
34
25
35
26
:param client_id: Client ID of the CLI.
36
27
:param username: The username for user credential.
@@ -52,14 +43,16 @@ def __init__(self, client_id, username, **kwargs):
52
43
53
44
self ._account = accounts [0 ]
54
45
55
- def get_token (self , * scopes , claims = None , ** kwargs ):
56
- # scopes = ['https://pas.windows.net/CheckMyAccess/Linux/.default']
57
- logger .debug ("UserCredential.get_token: scopes=%r, claims=%r, kwargs=%r" , scopes , claims , kwargs )
46
+ def acquire_token (self , scopes , claims = None , ** kwargs ):
47
+ # scopes must be a list.
48
+ # For acquiring SSH certificate, scopes is ['https://pas.windows.net/CheckMyAccess/Linux/.default']
49
+ # kwargs is already sanitized by CredentialAdaptor, so it can be safely passed to MSAL
50
+ logger .debug ("UserCredential.acquire_token: scopes=%r, claims=%r, kwargs=%r" , scopes , claims , kwargs )
58
51
59
52
if claims :
60
53
logger .warning ('Acquiring new access token silently for tenant %s with claims challenge: %s' ,
61
54
self ._msal_app .authority .tenant , claims )
62
- result = self ._msal_app .acquire_token_silent_with_error (list ( scopes ) , self ._account , claims_challenge = claims ,
55
+ result = self ._msal_app .acquire_token_silent_with_error (scopes , self ._account , claims_challenge = claims ,
63
56
** kwargs )
64
57
65
58
from azure .cli .core .azclierror import AuthenticationError
@@ -82,7 +75,7 @@ def get_token(self, *scopes, claims=None, **kwargs):
82
75
success_template , error_template = read_response_templates ()
83
76
84
77
result = self ._msal_app .acquire_token_interactive (
85
- list ( scopes ) , login_hint = self ._account ['username' ],
78
+ scopes , login_hint = self ._account ['username' ],
86
79
port = 8400 if self ._msal_app .authority .is_adfs else None ,
87
80
success_template = success_template , error_template = error_template , ** kwargs )
88
81
check_result (result )
@@ -91,25 +84,24 @@ def get_token(self, *scopes, claims=None, **kwargs):
91
84
# launch browser, but show the error message and `az login` command instead.
92
85
else :
93
86
raise
94
- return build_sdk_access_token ( result )
87
+ return result
95
88
96
89
97
90
class ServicePrincipalCredential : # pylint: disable=too-few-public-methods
98
91
99
92
def __init__ (self , client_id , client_credential , ** kwargs ):
100
- """Service principal credential implementing get_token interface .
93
+ """Service principal credential wrapping msal.application.ConfidentialClientApplication .
101
94
102
95
:param client_id: The service principal's client ID.
103
96
:param client_credential: client_credential that will be passed to MSAL.
104
97
"""
105
- self ._msal_app = ConfidentialClientApplication (client_id , client_credential , ** kwargs )
106
-
107
- def get_token (self , * scopes , ** kwargs ):
108
- logger .debug ("ServicePrincipalCredential.get_token: scopes=%r, kwargs=%r" , scopes , kwargs )
98
+ self ._msal_app = ConfidentialClientApplication (client_id , client_credential = client_credential , ** kwargs )
109
99
110
- result = self ._msal_app .acquire_token_for_client (list (scopes ), ** kwargs )
100
+ def acquire_token (self , scopes , ** kwargs ):
101
+ logger .debug ("ServicePrincipalCredential.acquire_token: scopes=%r, kwargs=%r" , scopes , kwargs )
102
+ result = self ._msal_app .acquire_token_for_client (scopes , ** kwargs )
111
103
check_result (result )
112
- return build_sdk_access_token ( result )
104
+ return result
113
105
114
106
115
107
class CloudShellCredential : # pylint: disable=too-few-public-methods
@@ -126,12 +118,11 @@ def __init__(self):
126
118
# token_cache=...
127
119
)
128
120
129
- def get_token (self , * scopes , ** kwargs ):
130
- logger .debug ("CloudShellCredential.get_token: scopes=%r, kwargs=%r" , scopes , kwargs )
131
- # kwargs is already sanitized by CredentialAdaptor, so it can be safely passed to MSAL
132
- result = self ._msal_app .acquire_token_interactive (list (scopes ), prompt = "none" , ** kwargs )
121
+ def acquire_token (self , scopes , ** kwargs ):
122
+ logger .debug ("CloudShellCredential.acquire_token: scopes=%r, kwargs=%r" , scopes , kwargs )
123
+ result = self ._msal_app .acquire_token_interactive (scopes , prompt = "none" , ** kwargs )
133
124
check_result (result , scopes = scopes )
134
- return build_sdk_access_token ( result )
125
+ return result
135
126
136
127
137
128
class ManagedIdentityCredential : # pylint: disable=too-few-public-methods
@@ -143,10 +134,10 @@ def __init__(self):
143
134
import requests
144
135
self ._msal_client = ManagedIdentityClient (SystemAssignedManagedIdentity (), http_client = requests .Session ())
145
136
146
- def get_token (self , * scopes , ** kwargs ):
147
- logger .debug ("ManagedIdentityCredential.get_token : scopes=%r, kwargs=%r" , scopes , kwargs )
137
+ def acquire_token (self , scopes , ** kwargs ):
138
+ logger .debug ("ManagedIdentityCredential.acquire_token : scopes=%r, kwargs=%r" , scopes , kwargs )
148
139
149
140
from .util import scopes_to_resource
150
141
result = self ._msal_client .acquire_token_for_client (resource = scopes_to_resource (scopes ))
151
142
check_result (result )
152
- return build_sdk_access_token ( result )
143
+ return result
0 commit comments