1414
1515import datetime
1616import os
17+ import random
1718import uuid
1819
1920from google .api_core import client_options
2021import google .api_core .exceptions
2122import google .auth
2223from google .cloud import bigquery
2324from google .cloud import bigquery_datatransfer
25+ from google .cloud import pubsub_v1
2426import pytest
2527
2628
29+ RESOURCE_PREFIX = "python_bigquery_datatransfer_samples_snippets"
30+ RESOURCE_DATE_FORMAT = "%Y%m%d%H%M%S"
31+ RESOURCE_DATE_LENGTH = 4 + 2 + 2 + 2 + 2 + 2
32+
33+
34+ def resource_prefix () -> str :
35+ timestamp = datetime .datetime .utcnow ().strftime (RESOURCE_DATE_FORMAT )
36+ random_string = hex (random .randrange (1000000 ))[2 :]
37+ return f"{ RESOURCE_PREFIX } _{ timestamp } _{ random_string } "
38+
39+
40+ def resource_name_to_date (resource_name : str ):
41+ start_date = len (RESOURCE_PREFIX ) + 1
42+ date_string = resource_name [start_date : start_date + RESOURCE_DATE_LENGTH ]
43+ parsed_date = datetime .datetime .strptime (date_string , RESOURCE_DATE_FORMAT )
44+ return parsed_date
45+
46+
47+ @pytest .fixture (scope = "session" , autouse = True )
48+ def cleanup_pubsub_topics (pubsub_client : pubsub_v1 .PublisherClient , project_id ):
49+ yesterday = datetime .datetime .utcnow () - datetime .timedelta (days = 1 )
50+ for topic in pubsub_client .list_topics (project = f"projects/{ project_id } " ):
51+ topic_id = topic .name .split ("/" )[- 1 ]
52+ if (
53+ topic_id .startswith (RESOURCE_PREFIX )
54+ and resource_name_to_date (topic_id ) < yesterday
55+ ):
56+ pubsub_client .delete_topic (topic = topic .name )
57+
58+
2759def temp_suffix ():
2860 now = datetime .datetime .now ()
2961 return f"{ now .strftime ('%Y%m%d%H%M%S' )} _{ uuid .uuid4 ().hex [:8 ]} "
@@ -35,6 +67,21 @@ def bigquery_client(default_credentials):
3567 return bigquery .Client (credentials = credentials , project = project_id )
3668
3769
70+ @pytest .fixture (scope = "session" )
71+ def pubsub_client (default_credentials ):
72+ credentials , _ = default_credentials
73+ return pubsub_v1 .PublisherClient (credentials = credentials )
74+
75+
76+ @pytest .fixture (scope = "session" )
77+ def pubsub_topic (pubsub_client : pubsub_v1 .PublisherClient , project_id ):
78+ topic_id = resource_prefix ()
79+ topic_path = pubsub_v1 .PublisherClient .topic_path (project_id , topic_id )
80+ pubsub_client .create_topic (name = topic_path )
81+ yield topic_path
82+ pubsub_client .delete_topic (topic = topic_path )
83+
84+
3885@pytest .fixture (scope = "session" )
3986def dataset_id (bigquery_client , project_id ):
4087 dataset_id = f"bqdts_{ temp_suffix ()} "
@@ -56,10 +103,10 @@ def project_id():
56103@pytest .fixture (scope = "session" )
57104def service_account_name (default_credentials ):
58105 credentials , _ = default_credentials
59- # Note: this property is not available when running with user account
60- # credentials, but only service account credentials are used in our test
61- # infrastructure .
62- return credentials . service_account_email
106+ # The service_account_email attribute is not available when running with
107+ # user account credentials, but should be available when running from our
108+ # continuous integration tests .
109+ return getattr ( credentials , " service_account_email" , None )
63110
64111
65112@pytest .fixture (scope = "session" )
0 commit comments