@@ -32,6 +32,7 @@ class ExecutionMetrics:
3232 bytes_processed : int = 0
3333 execution_secs : float = 0
3434 query_char_count : int = 0
35+ total_rows : int = 0
3536
3637 def count_job_stats (
3738 self ,
@@ -46,11 +47,13 @@ def count_job_stats(
4647 query_char_count = len (getattr (row_iterator , "query" , "" ))
4748 slot_millis = getattr (row_iterator , "slot_millis" , 0 )
4849 exec_seconds = 0.0
50+ total_rows = getattr (row_iterator , "total_rows" , 0 ) or 0
4951
5052 self .execution_count += 1
5153 self .query_char_count += query_char_count
5254 self .bytes_processed += bytes_processed
5355 self .slot_millis += slot_millis
56+ self .total_rows += total_rows
5457
5558 elif query_job .configuration .dry_run :
5659 query_char_count = len (query_job .query )
@@ -59,39 +62,43 @@ def count_job_stats(
5962 bytes_processed = 0
6063 slot_millis = 0
6164 exec_seconds = 0.0
65+ total_rows = 0
6266
6367 elif (stats := get_performance_stats (query_job )) is not None :
64- query_char_count , bytes_processed , slot_millis , exec_seconds = stats
68+ (
69+ query_char_count ,
70+ bytes_processed ,
71+ slot_millis ,
72+ exec_seconds ,
73+ total_rows ,
74+ ) = stats
6575 self .execution_count += 1
6676 self .query_char_count += query_char_count
6777 self .bytes_processed += bytes_processed
6878 self .slot_millis += slot_millis
6979 self .execution_secs += exec_seconds
70- write_stats_to_disk (
71- query_char_count = query_char_count ,
72- bytes_processed = bytes_processed ,
73- slot_millis = slot_millis ,
74- exec_seconds = exec_seconds ,
75- )
80+ self .total_rows += total_rows
7681
7782 else :
7883 # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
7984 bytes_processed = 0
8085 query_char_count = 0
8186 slot_millis = 0
8287 exec_seconds = 0
88+ total_rows = 0
8389
8490 write_stats_to_disk (
8591 query_char_count = query_char_count ,
8692 bytes_processed = bytes_processed ,
8793 slot_millis = slot_millis ,
8894 exec_seconds = exec_seconds ,
95+ total_rows = total_rows ,
8996 )
9097
9198
9299def get_performance_stats (
93100 query_job : bigquery .QueryJob ,
94- ) -> Optional [Tuple [int , int , int , float ]]:
101+ ) -> Optional [Tuple [int , int , int , float , int ]]:
95102 """Parse the query job for performance stats.
96103
97104 Return None if the stats do not reflect real work done in bigquery.
@@ -114,13 +121,17 @@ def get_performance_stats(
114121 execution_secs = (query_job .ended - query_job .created ).total_seconds ()
115122 query_char_count = len (query_job .query )
116123
124+ # Extract total rows from query job
125+ total_rows = getattr (query_job , "total_rows" , 0 ) or 0
126+
117127 return (
118128 query_char_count ,
119129 # Not every job populates these. For example, slot_millis is missing
120130 # from queries that came from cached results.
121131 bytes_processed if bytes_processed else 0 ,
122132 slot_millis if slot_millis else 0 ,
123133 execution_secs ,
134+ total_rows ,
124135 )
125136
126137
@@ -130,6 +141,7 @@ def write_stats_to_disk(
130141 bytes_processed : int ,
131142 slot_millis : int ,
132143 exec_seconds : float ,
144+ total_rows : Optional [int ] = None ,
133145):
134146 """For pytest runs only, log information about the query job
135147 to a file in order to create a performance report.
@@ -164,3 +176,9 @@ def write_stats_to_disk(
164176 bytes_file = os .path .join (current_directory , test_name + ".bytesprocessed" )
165177 with open (bytes_file , "a" ) as f :
166178 f .write (str (bytes_processed ) + "\n " )
179+
180+ # store total rows
181+ if total_rows is not None :
182+ total_rows_file = os .path .join (current_directory , test_name + ".totalrows" )
183+ with open (total_rows_file , "a" ) as f :
184+ f .write (str (total_rows ) + "\n " )
0 commit comments