1
1
from __future__ import annotations
2
2
3
3
import base64
4
- import json
5
4
from enum import Enum
6
5
from typing import Optional
7
6
8
7
import botocore .session
9
- import urllib3
10
8
from botocore .auth import SigV4Auth
11
9
from botocore .awsrequest import AWSRequest
12
10
from botocore .credentials import Credentials , ReadOnlyCredentials
@@ -24,60 +22,11 @@ def _authorization_header(client_id: str, client_secret: str) -> str:
24
22
str: Base64 encoded Authorization header
25
23
"""
26
24
auth_string = f"{ client_id } :{ client_secret } "
27
- encoded_auth_string = base64 .b64encode (auth_string .encode ("utf-8" )).decode ("utf-8" )
25
+ encoded_auth_bytes = base64 .b64encode (auth_string )
26
+ encoded_auth_string = encoded_auth_bytes .decode ("utf-8" )
28
27
return f"Basic { encoded_auth_string } "
29
28
30
29
31
- def _get_token (response : dict ) -> str :
32
- """
33
- Gets the token from the response
34
-
35
- Args:
36
- response (dict): Response from the authentication endpoint
37
-
38
- Returns:
39
- str: Token
40
- """
41
- if "access_token" in response :
42
- return response ["access_token" ]
43
- elif "id_token" in response :
44
- return response ["id_token" ]
45
- else :
46
- raise Exception ("Unable to get token from response" )
47
-
48
-
49
- def _request_access_token (auth_endpoint : str , body : dict , headers : dict ) -> str :
50
- """
51
- Gets the token from the Auth0 authentication endpoint
52
-
53
- Args:
54
- client_id (str): Client ID
55
- client_secret (str): Client Secret
56
- audience (str): Audience
57
- auth_endpoint (str): Auth0 authentication endpoint
58
-
59
- Returns:
60
- str: Token
61
- """
62
- headers ["Content-Type" ] = "application/x-www-form-urlencoded"
63
-
64
- http = urllib3 .PoolManager ()
65
-
66
- if isinstance (body , dict ):
67
- json_body = json .dumps (body )
68
- elif isinstance (body , str ):
69
- json_body = body
70
-
71
- try :
72
- response = http .request ("POST" , auth_endpoint , headers = headers , body = json_body )
73
- response = response .json ()
74
- return _get_token (response )
75
- except (urllib3 .exceptions .RequestError , urllib3 .exceptions .HTTPError ) as error :
76
- # If there is an error with the request, handle it here
77
- # REVIEW: CREATE A CUSTOM EXCEPTION FOR THIS
78
- raise Exception (error ) from error
79
-
80
-
81
30
class AWSServicePrefix (Enum ):
82
31
"""
83
32
AWS Service Prefixes - Enumerations of the supported service proxy types
@@ -141,7 +90,6 @@ def __init__(
141
90
secret_key : Optional [str ] = None ,
142
91
token : Optional [str ] = None ,
143
92
):
144
-
145
93
self .service = service .value
146
94
self .region = region
147
95
self .method = method
@@ -180,7 +128,6 @@ def __call__(self):
180
128
181
129
182
130
class JWTAuth :
183
-
184
131
def __init__ (
185
132
self ,
186
133
client_id : str ,
@@ -190,7 +137,6 @@ def __init__(
190
137
audience : Optional [str ] = None ,
191
138
scope : Optional [list ] = None ,
192
139
):
193
-
194
140
self .client_id = client_id
195
141
self .client_secret = client_secret
196
142
self .auth_endpoint = auth_endpoint .removesuffix ("/" )
@@ -205,22 +151,19 @@ def __init__(
205
151
}
206
152
207
153
if self .provider == AuthProvider .COGNITO .value :
208
-
209
154
encoded_auth_string = _authorization_header (self .client_id , self .client_secret )
210
155
self .headers ["Authorization" ] = f"Basic { encoded_auth_string } "
211
156
self .body ["grant_type" ] = "client_credentials"
212
157
if self .scope :
213
158
self .body ["scope" ] = " " .join (self .scope )
214
159
215
160
if self .provider == AuthProvider .AUTH0 .value :
216
-
217
161
self .body ["client_id" ] = self .client_id
218
162
self .body ["client_secret" ] = self .client_secret
219
163
self .body ["grant_type" ] = "client_credentials"
220
164
self .body ["audience" ] = self .audience
221
165
222
166
if self .provider == AuthProvider .OKTA .value :
223
-
224
167
encoded_auth_string = _authorization_header (self .client_id , self .client_secret )
225
168
self .headers ["Accept" ] = "application/json"
226
169
self .headers ["Authorization" ] = f"Basic { encoded_auth_string } "
0 commit comments