diff --git a/pyatlan/client/atlan.py b/pyatlan/client/atlan.py index 00d7a8a82..25d0e099e 100644 --- a/pyatlan/client/atlan.py +++ b/pyatlan/client/atlan.py @@ -360,7 +360,13 @@ def dq_template_config_cache(self) -> DQTemplateConfigCache: return self._dq_template_config_cache @classmethod - def from_token_guid(cls, guid: str) -> AtlanClient: + def from_token_guid( + cls, + guid: str, + base_url: Optional[str] = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + ) -> AtlanClient: """ Create an AtlanClient instance using an API token GUID. @@ -371,15 +377,20 @@ def from_token_guid(cls, guid: str) -> AtlanClient: 4. Returns a new AtlanClient authenticated with the resolved token :param guid: API token GUID to resolve + :param base_url: Optional base URL for the Atlan service(overrides ATLAN_BASE_URL environment variable) + :param client_id: Optional client ID for authentication (overrides CLIENT_ID environment variable) + :param client_secret: Optional client secret for authentication (overrides CLIENT_SECRET environment variable) :returns: a new client instance authenticated with the resolved token :raises: ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM: If any step in the token resolution fails """ - base_url = os.environ.get("ATLAN_BASE_URL", "INTERNAL") + final_base_url = base_url or os.environ.get("ATLAN_BASE_URL", "INTERNAL") # Step 1: Initialize base client and get Atlan-Argo credentials # Note: Using empty api_key as we're bootstrapping authentication - client = AtlanClient(base_url=base_url, api_key="") - client_info = client.impersonate._get_client_info() + client = AtlanClient(base_url=final_base_url, api_key="") + client_info = client.impersonate._get_client_info( + client_id=client_id, client_secret=client_secret + ) # Prepare credentials for Atlan-Argo token request argo_credentials = { @@ -393,7 +404,7 @@ def from_token_guid(cls, guid: str) -> AtlanClient: try: raw_json = client._call_api(GET_TOKEN, request_obj=argo_credentials) argo_token = AccessTokenResponse(**raw_json).access_token - temp_argo_client = AtlanClient(base_url=base_url, api_key=argo_token) + temp_argo_client = AtlanClient(base_url=final_base_url, api_key=argo_token) except AtlanError as atlan_err: raise ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM.exception_with_parameters( "Failed to obtain Atlan-Argo token" @@ -419,7 +430,7 @@ def from_token_guid(cls, guid: str) -> AtlanClient: token_api_key = AccessTokenResponse(**raw_json).access_token # Step 5: Create and return the authenticated client - return AtlanClient(base_url=base_url, api_key=token_api_key) + return AtlanClient(base_url=final_base_url, api_key=token_api_key) except AtlanError as atlan_err: raise ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM.exception_with_parameters( "Failed to obtain access token for API token" diff --git a/pyatlan/client/impersonate.py b/pyatlan/client/impersonate.py index e935ec9b7..83e8705af 100644 --- a/pyatlan/client/impersonate.py +++ b/pyatlan/client/impersonate.py @@ -66,12 +66,16 @@ def user(self, user_id: str) -> str: except AtlanError as atlan_err: raise ErrorCode.UNABLE_TO_IMPERSONATE.exception_with_parameters() from atlan_err - def _get_client_info(self) -> ClientInfo: - client_id = os.getenv("CLIENT_ID") - client_secret = os.getenv("CLIENT_SECRET") - if not client_id or not client_secret: + def _get_client_info( + self, client_id: Optional[str] = None, client_secret: Optional[str] = None + ) -> ClientInfo: + final_client_id = client_id or os.getenv("CLIENT_ID") + final_client_secret = client_secret or os.getenv("CLIENT_SECRET") + if not final_client_id or not final_client_secret: raise ErrorCode.MISSING_CREDENTIALS.exception_with_parameters() - client_info = ClientInfo(client_id=client_id, client_secret=client_secret) + client_info = ClientInfo( + client_id=final_client_id, client_secret=final_client_secret + ) return client_info def escalate(self) -> str: