diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 94cab7cbf6..d9f1c0f295 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -247,7 +247,7 @@ def start_query_with_client( api_timeout=timeout, ) if metrics is not None: - metrics.count_job_stats() + metrics.count_job_stats(query=sql) return results_iterator, None query_job = bq_client.query( diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py index 1cb561693b..b4e1458b21 100644 --- a/bigframes/session/metrics.py +++ b/bigframes/session/metrics.py @@ -32,29 +32,35 @@ class ExecutionMetrics: execution_secs: float = 0 query_char_count: int = 0 - def count_job_stats(self, query_job: Optional[bq_job.QueryJob] = None): + def count_job_stats( + self, query_job: Optional[bq_job.QueryJob] = None, query: str = "" + ): if query_job is None: + query_char_count = len(query) self.execution_count += 1 + self.query_char_count += query_char_count + if LOGGING_NAME_ENV_VAR in os.environ: + write_stats_to_disk(query_char_count) return stats = get_performance_stats(query_job) if stats is not None: - bytes_processed, slot_millis, execution_secs, query_char_count = stats + query_char_count, bytes_processed, slot_millis, execution_secs = stats self.execution_count += 1 + self.query_char_count += query_char_count self.bytes_processed += bytes_processed self.slot_millis += slot_millis self.execution_secs += execution_secs - self.query_char_count += query_char_count if LOGGING_NAME_ENV_VAR in os.environ: # when running notebooks via pytest nbmake write_stats_to_disk( - bytes_processed, slot_millis, execution_secs, query_char_count + query_char_count, bytes_processed, slot_millis, execution_secs ) def get_performance_stats( query_job: bigquery.QueryJob, -) -> Optional[Tuple[int, int, float, int]]: +) -> Optional[Tuple[int, int, int, float]]: """Parse the query job for performance stats. Return None if the stats do not reflect real work done in bigquery. @@ -77,11 +83,14 @@ def get_performance_stats( execution_secs = (query_job.ended - query_job.created).total_seconds() query_char_count = len(query_job.query) - return bytes_processed, slot_millis, execution_secs, query_char_count + return query_char_count, bytes_processed, slot_millis, execution_secs def write_stats_to_disk( - bytes_processed: int, slot_millis: int, exec_seconds: float, query_char_count: int + query_char_count: int, + bytes_processed: Optional[int] = None, + slot_millis: Optional[int] = None, + exec_seconds: Optional[float] = None, ): """For pytest runs only, log information about the query job to a file in order to create a performance report. @@ -95,22 +104,27 @@ def write_stats_to_disk( test_name = os.environ[LOGGING_NAME_ENV_VAR] current_directory = os.getcwd() - # store bytes processed - bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed") - with open(bytes_file, "a") as f: - f.write(str(bytes_processed) + "\n") - - # store slot milliseconds - slot_file = os.path.join(current_directory, test_name + ".slotmillis") - with open(slot_file, "a") as f: - f.write(str(slot_millis) + "\n") - - # store execution time seconds - exec_time_file = os.path.join( - current_directory, test_name + ".bq_exec_time_seconds" - ) - with open(exec_time_file, "a") as f: - f.write(str(exec_seconds) + "\n") + if ( + (bytes_processed is not None) + and (slot_millis is not None) + and (exec_seconds is not None) + ): + # store bytes processed + bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed") + with open(bytes_file, "a") as f: + f.write(str(bytes_processed) + "\n") + + # store slot milliseconds + slot_file = os.path.join(current_directory, test_name + ".slotmillis") + with open(slot_file, "a") as f: + f.write(str(slot_millis) + "\n") + + # store execution time seconds + exec_time_file = os.path.join( + current_directory, test_name + ".bq_exec_time_seconds" + ) + with open(exec_time_file, "a") as f: + f.write(str(exec_seconds) + "\n") # store length of query query_char_count_file = os.path.join( diff --git a/scripts/run_and_publish_benchmark.py b/scripts/run_and_publish_benchmark.py index 28605a8155..402ba4d213 100644 --- a/scripts/run_and_publish_benchmark.py +++ b/scripts/run_and_publish_benchmark.py @@ -95,55 +95,62 @@ def collect_benchmark_result( if not ( len(bytes_files) == len(millis_files) - == len(local_seconds_files) == len(bq_seconds_files) - == len(query_char_count_files) + <= len(query_char_count_files) + == len(local_seconds_files) ): raise ValueError( "Mismatch in the number of report files for bytes, millis, seconds and query char count." ) - for idx in range(len(bytes_files)): - bytes_file = bytes_files[idx] - millis_file = millis_files[idx] - bq_seconds_file = bq_seconds_files[idx] - query_char_count_file = query_char_count_files[idx] - - filename = bytes_file.relative_to(path).with_suffix("") - - if filename != millis_file.relative_to(path).with_suffix( - "" - ) or filename != bq_seconds_file.relative_to(path).with_suffix(""): - raise ValueError( - "File name mismatch among bytes, millis, and seconds reports." - ) + has_full_metrics = len(bq_seconds_files) == len(local_seconds_files) + for idx in range(len(local_seconds_files)): + query_char_count_file = query_char_count_files[idx] local_seconds_file = local_seconds_files[idx] + filename = query_char_count_file.relative_to(path).with_suffix("") if filename != local_seconds_file.relative_to(path).with_suffix(""): raise ValueError( - "File name mismatch among bytes, millis, and seconds reports." + "File name mismatch between query_char_count and seconds reports." ) - with open(bytes_file, "r") as file: + with open(query_char_count_file, "r") as file: lines = file.read().splitlines() + query_char_count = sum(int(line) for line in lines) / iterations query_count = len(lines) / iterations - total_bytes = sum(int(line) for line in lines) / iterations - - with open(millis_file, "r") as file: - lines = file.read().splitlines() - total_slot_millis = sum(int(line) for line in lines) / iterations with open(local_seconds_file, "r") as file: lines = file.read().splitlines() local_seconds = sum(float(line) for line in lines) / iterations - with open(bq_seconds_file, "r") as file: - lines = file.read().splitlines() - bq_seconds = sum(float(line) for line in lines) / iterations + if not has_full_metrics: + total_bytes = None + total_slot_millis = None + bq_seconds = None + else: + bytes_file = bytes_files[idx] + millis_file = millis_files[idx] + bq_seconds_file = bq_seconds_files[idx] + if ( + filename != bytes_file.relative_to(path).with_suffix("") + or filename != millis_file.relative_to(path).with_suffix("") + or filename != bq_seconds_file.relative_to(path).with_suffix("") + ): + raise ValueError( + "File name mismatch among query_char_count, bytes, millis, and seconds reports." + ) - with open(query_char_count_file, "r") as file: - lines = file.read().splitlines() - query_char_count = sum(int(line) for line in lines) / iterations + with open(bytes_file, "r") as file: + lines = file.read().splitlines() + total_bytes = sum(int(line) for line in lines) / iterations + + with open(millis_file, "r") as file: + lines = file.read().splitlines() + total_slot_millis = sum(int(line) for line in lines) / iterations + + with open(bq_seconds_file, "r") as file: + lines = file.read().splitlines() + bq_seconds = sum(float(line) for line in lines) / iterations results_dict[str(filename)] = [ query_count, @@ -194,11 +201,19 @@ def collect_benchmark_result( ) print( f"{index} - query count: {row['Query_Count']}," - f" query char count: {row['Query_Char_Count']},", - f" bytes processed sum: {row['Bytes_Processed']}," - f" slot millis sum: {row['Slot_Millis']}," - f" local execution time: {formatted_local_exec_time} seconds," - f" bigquery execution time: {round(row['BigQuery_Execution_Time_Sec'], 1)} seconds", + + f" query char count: {row['Query_Char_Count']}," + + ( + f" bytes processed sum: {row['Bytes_Processed']}," + if has_full_metrics + else "" + ) + + (f" slot millis sum: {row['Slot_Millis']}," if has_full_metrics else "") + + f" local execution time: {formatted_local_exec_time} seconds" + + ( + f", bigquery execution time: {round(row['BigQuery_Execution_Time_Sec'], 1)} seconds" + if has_full_metrics + else "" + ) ) geometric_mean_queries = geometric_mean_excluding_zeros( @@ -221,12 +236,24 @@ def collect_benchmark_result( ) print( - f"---Geometric mean of queries: {geometric_mean_queries}, " - f"Geometric mean of queries char counts: {geometric_mean_query_char_count}, " - f"Geometric mean of bytes processed: {geometric_mean_bytes}, " - f"Geometric mean of slot millis: {geometric_mean_slot_millis}, " - f"Geometric mean of local execution time: {geometric_mean_local_seconds} seconds, " - f"Geometric mean of BigQuery execution time: {geometric_mean_bq_seconds} seconds---" + f"---Geometric mean of queries: {geometric_mean_queries}," + + f" Geometric mean of queries char counts: {geometric_mean_query_char_count}," + + ( + f" Geometric mean of bytes processed: {geometric_mean_bytes}," + if has_full_metrics + else "" + ) + + ( + f" Geometric mean of slot millis: {geometric_mean_slot_millis}," + if has_full_metrics + else "" + ) + + f" Geometric mean of local execution time: {geometric_mean_local_seconds} seconds" + + ( + f", Geometric mean of BigQuery execution time: {geometric_mean_bq_seconds} seconds---" + if has_full_metrics + else "" + ) ) error_message = (