From 334f0a3ee95c001cdfddc503515d1d1ad1ecbe17 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Wed, 3 Nov 2021 16:00:45 +0000 Subject: [PATCH 1/6] implement API requests with aiohttp --- iam_groups_authn/sql_admin.py | 2 +- iam_groups_authn/sync.py | 102 +++++++++++++++++++++++----------- requirements.txt | 2 +- tests/requirements-test.txt | 1 - 4 files changed, 73 insertions(+), 34 deletions(-) diff --git a/iam_groups_authn/sql_admin.py b/iam_groups_authn/sql_admin.py index 30c5e7e..02a02fa 100644 --- a/iam_groups_authn/sql_admin.py +++ b/iam_groups_authn/sql_admin.py @@ -73,7 +73,7 @@ async def add_missing_db_users( missing_db_users = get_users_to_add(iam_users, db_users) # add missing users to database instance for user in missing_db_users: - user_service.insert_db_user( + await user_service.insert_db_user( user, InstanceConnectionName(*instance_connection_name.split(":")) ) return missing_db_users diff --git a/iam_groups_authn/sync.py b/iam_groups_authn/sync.py index 553ea29..54dd099 100644 --- a/iam_groups_authn/sync.py +++ b/iam_groups_authn/sync.py @@ -14,13 +14,13 @@ # sync.py contains functions for syncing IAM groups with Cloud SQL instances +import asyncio from google.auth import iam from google.auth.transport.requests import Request from google.oauth2 import service_account -from googleapiclient.discovery import build -from googleapiclient.errors import HttpError from iam_groups_authn.mysql import mysql_username -from iam_groups_authn.utils import async_wrap +import json +from aiohttp import ClientSession # URI for OAuth2 credentials TOKEN_URI = "https://accounts.google.com/o/oauth2/token" @@ -36,9 +36,11 @@ def __init__(self, creds): creds: OAuth2 credentials to call admin APIs. """ self.creds = creds + self.client_session = ClientSession( + headers={"Content-Type": "application/json"} + ) - @async_wrap - def get_group_members(self, group): + async def get_group_members(self, group): """Get all members of an IAM group. Given an IAM group, get all members (groups or users) that belong to the @@ -51,21 +53,34 @@ def get_group_members(self, group): members: List of all members (groups or users) that belong to the IAM group. """ # build service to call Admin SDK Directory API - service = build("admin", "directory_v1", credentials=self.creds) + updated_creds = get_credentials( + self.creds, + scopes=[ + "https://www.googleapis.com/auth/admin.directory.group.member.readonly" + ], + ) + + headers = { + "Authorization": f"Bearer {updated_creds.token}", + } + + url = f"https://admin.googleapis.com/admin/directory/v1/groups/{group}/members" try: # call the Admin SDK Directory API - results = service.members().list(groupKey=group).execute() + resp = await self.client_session.get( + url, headers=headers, raise_for_status=True + ) + results = json.loads(await resp.text()) members = results.get("members", []) return members # handle errors if IAM group does not exist etc. - except HttpError as e: - raise HttpError( + except Exception as e: + raise Exception( f"Error: Failed to get IAM members of IAM group `{group}`. Verify group exists and is configured correctly." ) from e - @async_wrap - def get_db_users(self, instance_connection_name): + async def get_db_users(self, instance_connection_name): """Get all database users of a Cloud SQL instance. Given a database instance and a Google Cloud project, get all the database @@ -79,17 +94,25 @@ def get_db_users(self, instance_connection_name): Returns: users: List of all database users that belong to the Cloud SQL instance. """ - # build service to call SQL Admin API - service = build("sqladmin", "v1beta4", credentials=self.creds) + # build request to SQL Admin API + if not self.creds.valid: + request = Request() + self.creds.refresh(request) + + headers = { + "Authorization": f"Bearer {self.creds.token}", + } + + project = instance_connection_name.project + instance = instance_connection_name.instance + url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{project}/instances/{instance}/users" + try: - results = ( - service.users() - .list( - project=instance_connection_name.project, - instance=instance_connection_name.instance, - ) - .execute() + # call the SQL Admin API + resp = await self.client_session.get( + url, headers=headers, raise_for_status=True ) + results = json.loads(await resp.text()) users = results.get("items", []) return users except Exception as e: @@ -97,7 +120,7 @@ def get_db_users(self, instance_connection_name): f"Error: Failed to get the database users for instance `{instance_connection_name}`. Verify instance connection name and instance details." ) from e - def insert_db_user(self, user_email, instance_connection_name): + async def insert_db_user(self, user_email, instance_connection_name): """Create DB user from IAM user. Given an IAM user's email, insert the IAM user as a DB user for Cloud SQL instance. @@ -108,18 +131,24 @@ def insert_db_user(self, user_email, instance_connection_name): (e.g. InstanceConnectionName(project='my-project', region='my-region', instance='my-instance')) """ - # build service to call SQL Admin API - service = build("sqladmin", "v1beta4", credentials=self.creds) + # build request to SQL Admin API + if not self.creds.valid: + request = Request() + self.creds.refresh(request) + + headers = { + "Authorization": f"Bearer {self.creds.token}", + } + + project = instance_connection_name.project + instance = instance_connection_name.instance + url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{project}/instances/{instance}/users" user = {"name": user_email, "type": "CLOUD_IAM_USER"} + try: - results = ( - service.users() - .insert( - project=instance_connection_name.project, - instance=instance_connection_name.instance, - body=user, - ) - .execute() + # call the SQL Admin API + resp = await self.client_session.post( + url, headers=headers, json=user, raise_for_status=True ) return except Exception as e: @@ -127,6 +156,17 @@ def insert_db_user(self, user_email, instance_connection_name): f"Error: Failed to add IAM user `{user_email}` to Cloud SQL database instance `{instance_connection_name.instance}`." ) from e + def __del__(self): + """Deconstructor for UserService to close ClientSession and have + graceful exit. + """ + + async def deconstruct(): + if not self.client_session.closed: + await self.client_session.close() + + asyncio.run_coroutine_threadsafe(deconstruct(), loop=asyncio.get_event_loop()) + async def get_users_with_roles(role_service, role): """Get mapping of group role grants on DB users. diff --git a/requirements.txt b/requirements.txt index 0fc659c..2bddfba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ quart==0.15.1 hypercorn==0.11.2 SQLAlchemy==1.4.22 -google-api-python-client==2.19.1 google-auth==2.0.0 PyMySQL==1.0.2 cloud-sql-python-connector[pymysql]==0.4.1 google-cloud-logging==2.6.0 +aiohttp==3.8.0 diff --git a/tests/requirements-test.txt b/tests/requirements-test.txt index 2f0f677..f457f19 100644 --- a/tests/requirements-test.txt +++ b/tests/requirements-test.txt @@ -1,7 +1,6 @@ pytest==6.2.5 quart==0.15.1 SQLAlchemy==1.4.22 -google-api-python-client==2.19.1 google-auth==2.0.0 PyMySQL==1.0.2 cloud-sql-python-connector[pymysql]==0.4.1 From e6d3079922bd2aadad5864c90e02433f46b4ec19 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Wed, 3 Nov 2021 17:19:38 +0000 Subject: [PATCH 2/6] fix scoped credentials --- app.py | 2 +- iam_groups_authn/sync.py | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/app.py b/app.py index 23cd4e2..5dcc449 100644 --- a/app.py +++ b/app.py @@ -97,7 +97,7 @@ async def run_groups_authn(): updated_creds = get_credentials(creds, SCOPES) # create UserService object for API calls - user_service = UserService(creds) + user_service = UserService(updated_creds) # keep track of IAM group and database instance tasks group_tasks = {} diff --git a/iam_groups_authn/sync.py b/iam_groups_authn/sync.py index 54dd099..90afcc9 100644 --- a/iam_groups_authn/sync.py +++ b/iam_groups_authn/sync.py @@ -53,15 +53,12 @@ async def get_group_members(self, group): members: List of all members (groups or users) that belong to the IAM group. """ # build service to call Admin SDK Directory API - updated_creds = get_credentials( - self.creds, - scopes=[ - "https://www.googleapis.com/auth/admin.directory.group.member.readonly" - ], - ) + if not self.creds.valid: + request = Request() + self.creds.refresh(request) headers = { - "Authorization": f"Bearer {updated_creds.token}", + "Authorization": f"Bearer {self.creds.token}", } url = f"https://admin.googleapis.com/admin/directory/v1/groups/{group}/members" From 7f6d89fecc644594c833a48516107905d2930409 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Wed, 3 Nov 2021 17:25:48 +0000 Subject: [PATCH 3/6] validate credentials before use --- iam_groups_authn/mysql.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/iam_groups_authn/mysql.py b/iam_groups_authn/mysql.py index a7f692f..047eb40 100644 --- a/iam_groups_authn/mysql.py +++ b/iam_groups_authn/mysql.py @@ -19,6 +19,7 @@ from google.cloud.sql.connector import connector from google.cloud.sql.connector.instance_connection_manager import IPTypes from iam_groups_authn.utils import async_wrap +from google.auth.transport.requests import Request def mysql_username(iam_email): @@ -137,6 +138,11 @@ def init_connection_engine(instance_connection_name, creds, ip_type=IPTypes.PUBL "pool_timeout": 30, # 30 seconds "pool_recycle": 1800, # 30 minutes } + # refresh credentials if need be + if not creds.valid: + request = Request() + creds.refresh(request) + # service account email to access DB, mysql truncates usernames to before '@' sign service_account_email = mysql_username(creds.service_account_email) From 73e1c5f1ebeaa575457223cbe61f38ad0d3cfa32 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Wed, 3 Nov 2021 17:27:54 +0000 Subject: [PATCH 4/6] update comment wording --- iam_groups_authn/mysql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iam_groups_authn/mysql.py b/iam_groups_authn/mysql.py index 047eb40..a3dea7b 100644 --- a/iam_groups_authn/mysql.py +++ b/iam_groups_authn/mysql.py @@ -138,7 +138,7 @@ def init_connection_engine(instance_connection_name, creds, ip_type=IPTypes.PUBL "pool_timeout": 30, # 30 seconds "pool_recycle": 1800, # 30 minutes } - # refresh credentials if need be + # refresh credentials if not valid if not creds.valid: request = Request() creds.refresh(request) From 19939a6782b89f4cd0c04a9b318e164f47ef2e9e Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Wed, 3 Nov 2021 18:19:39 +0000 Subject: [PATCH 5/6] Add helper function for aiohttp requests --- iam_groups_authn/sync.py | 70 +++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/iam_groups_authn/sync.py b/iam_groups_authn/sync.py index 90afcc9..4bfd740 100644 --- a/iam_groups_authn/sync.py +++ b/iam_groups_authn/sync.py @@ -53,20 +53,12 @@ async def get_group_members(self, group): members: List of all members (groups or users) that belong to the IAM group. """ # build service to call Admin SDK Directory API - if not self.creds.valid: - request = Request() - self.creds.refresh(request) - - headers = { - "Authorization": f"Bearer {self.creds.token}", - } - url = f"https://admin.googleapis.com/admin/directory/v1/groups/{group}/members" try: # call the Admin SDK Directory API - resp = await self.client_session.get( - url, headers=headers, raise_for_status=True + resp = await authenticated_request( + self.creds, url, self.client_session, "GET" ) results = json.loads(await resp.text()) members = results.get("members", []) @@ -92,22 +84,14 @@ async def get_db_users(self, instance_connection_name): users: List of all database users that belong to the Cloud SQL instance. """ # build request to SQL Admin API - if not self.creds.valid: - request = Request() - self.creds.refresh(request) - - headers = { - "Authorization": f"Bearer {self.creds.token}", - } - project = instance_connection_name.project instance = instance_connection_name.instance url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{project}/instances/{instance}/users" try: # call the SQL Admin API - resp = await self.client_session.get( - url, headers=headers, raise_for_status=True + resp = await authenticated_request( + self.creds, url, self.client_session, "GET" ) results = json.loads(await resp.text()) users = results.get("items", []) @@ -129,14 +113,6 @@ async def insert_db_user(self, user_email, instance_connection_name): instance='my-instance')) """ # build request to SQL Admin API - if not self.creds.valid: - request = Request() - self.creds.refresh(request) - - headers = { - "Authorization": f"Bearer {self.creds.token}", - } - project = instance_connection_name.project instance = instance_connection_name.instance url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{project}/instances/{instance}/users" @@ -144,8 +120,8 @@ async def insert_db_user(self, user_email, instance_connection_name): try: # call the SQL Admin API - resp = await self.client_session.post( - url, headers=headers, json=user, raise_for_status=True + resp = await authenticated_request( + self.creds, url, self.client_session, "POST", body=user ) return except Exception as e: @@ -165,6 +141,40 @@ async def deconstruct(): asyncio.run_coroutine_threadsafe(deconstruct(), loop=asyncio.get_event_loop()) +async def authenticated_request(creds, url, client_session, request_type, body=None): + """Helper function to build authenticated aiohttp requests. + + Args: + creds: OAuth2 credentials for authorizing requests. + url: URL for aiohttp request. + client_session: aiohttp ClientSession object. + request_type: String determining request type ('GET' or 'POST'). + body: (optional) JSON body for request. + + Return: + Result from aiohttp request. + """ + if not creds.valid: + request = Request() + creds.refresh(request) + + headers = { + "Authorization": f"Bearer {creds.token}", + } + + if request_type.upper() == "GET": + return await client_session.get(url, headers=headers, raise_for_status=True) + elif request_type.upper() == "POST": + return await client_session.post( + url, headers=headers, json=body, raise_for_status=True + ) + else: + raise ValueError( + "Request type not recognized! " + "Please verify request is type 'GET' or 'POST'." + ) + + async def get_users_with_roles(role_service, role): """Get mapping of group role grants on DB users. From f5399488f22f8487f8007ebc6f44127045c06f10 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Wed, 3 Nov 2021 18:51:43 +0000 Subject: [PATCH 6/6] Substitute magic string for enum --- iam_groups_authn/sync.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/iam_groups_authn/sync.py b/iam_groups_authn/sync.py index 4bfd740..7df4d15 100644 --- a/iam_groups_authn/sync.py +++ b/iam_groups_authn/sync.py @@ -21,6 +21,7 @@ from iam_groups_authn.mysql import mysql_username import json from aiohttp import ClientSession +from enum import Enum # URI for OAuth2 credentials TOKEN_URI = "https://accounts.google.com/o/oauth2/token" @@ -58,7 +59,7 @@ async def get_group_members(self, group): try: # call the Admin SDK Directory API resp = await authenticated_request( - self.creds, url, self.client_session, "GET" + self.creds, url, self.client_session, RequestType.get ) results = json.loads(await resp.text()) members = results.get("members", []) @@ -91,7 +92,7 @@ async def get_db_users(self, instance_connection_name): try: # call the SQL Admin API resp = await authenticated_request( - self.creds, url, self.client_session, "GET" + self.creds, url, self.client_session, RequestType.get ) results = json.loads(await resp.text()) users = results.get("items", []) @@ -121,7 +122,7 @@ async def insert_db_user(self, user_email, instance_connection_name): try: # call the SQL Admin API resp = await authenticated_request( - self.creds, url, self.client_session, "POST", body=user + self.creds, url, self.client_session, RequestType.post, body=user ) return except Exception as e: @@ -141,6 +142,13 @@ async def deconstruct(): asyncio.run_coroutine_threadsafe(deconstruct(), loop=asyncio.get_event_loop()) +class RequestType(Enum): + """Helper class for supported aiohttp request types.""" + + get = 1 + post = 2 + + async def authenticated_request(creds, url, client_session, request_type, body=None): """Helper function to build authenticated aiohttp requests. @@ -148,7 +156,7 @@ async def authenticated_request(creds, url, client_session, request_type, body=N creds: OAuth2 credentials for authorizing requests. url: URL for aiohttp request. client_session: aiohttp ClientSession object. - request_type: String determining request type ('GET' or 'POST'). + request_type: RequestType enum determining request type. body: (optional) JSON body for request. Return: @@ -162,16 +170,15 @@ async def authenticated_request(creds, url, client_session, request_type, body=N "Authorization": f"Bearer {creds.token}", } - if request_type.upper() == "GET": + if request_type == RequestType.get: return await client_session.get(url, headers=headers, raise_for_status=True) - elif request_type.upper() == "POST": + elif request_type == RequestType.post: return await client_session.post( url, headers=headers, json=body, raise_for_status=True ) else: raise ValueError( - "Request type not recognized! " - "Please verify request is type 'GET' or 'POST'." + "Request type not recognized! " "Please verify RequestType is valid." )