Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .kokoro/continuous/notebook.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ env_vars: {
value: "notebook"
}

env_vars: {
key: "BENCHMARK_AND_PUBLISH"
value: "true"
}

env_vars: {
key: "GOOGLE_CLOUD_PROJECT"
value: "bigframes-load-testing"
Expand Down
5 changes: 5 additions & 0 deletions .kokoro/load/benchmark.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ env_vars: {
value: "benchmark"
}

env_vars: {
key: "BENCHMARK_AND_PUBLISH"
value: "true"
}

env_vars: {
key: "GOOGLE_CLOUD_PROJECT"
value: "bigframes-load-testing"
Expand Down
30 changes: 23 additions & 7 deletions bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ class ExecutionMetrics:
def count_job_stats(self, query_job: bq_job.QueryJob):
stats = get_performance_stats(query_job)
if stats is not None:
bytes_processed, slot_millis = stats
bytes_processed, slot_millis, exec_seconds = stats
self.execution_count += 1
self.bytes_processed += bytes_processed
self.slot_millis += slot_millis
if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
write_stats_to_disk(bytes_processed, slot_millis)
write_stats_to_disk(bytes_processed, slot_millis, exec_seconds)


def get_performance_stats(query_job: bigquery.QueryJob) -> Optional[Tuple[int, int]]:
def get_performance_stats(
query_job: bigquery.QueryJob,
) -> Optional[Tuple[int, int, float]]:
"""Parse the query job for performance stats.

Return None if the stats do not reflect real work done in bigquery.
Expand All @@ -57,14 +59,21 @@ def get_performance_stats(query_job: bigquery.QueryJob) -> Optional[Tuple[int, i
slot_millis = query_job.slot_millis
if not isinstance(slot_millis, int):
return None # filter out mocks

if query_job.configuration.dry_run:
# dry run stats are just predictions of the real run
slot_millis = 0

return bytes_processed, slot_millis
exec_seconds = (
(query_job.ended - query_job.created).total_seconds()
if query_job.created is not None and query_job.ended is not None
else None
)

return bytes_processed, slot_millis, exec_seconds


def write_stats_to_disk(bytes_processed: int, slot_millis: int):
def write_stats_to_disk(bytes_processed: int, slot_millis: int, exec_seconds: float):
"""For pytest runs only, log information about the query job
to a file in order to create a performance report.
"""
Expand All @@ -83,6 +92,13 @@ def write_stats_to_disk(bytes_processed: int, slot_millis: int):
f.write(str(bytes_processed) + "\n")

# store slot milliseconds
bytes_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(bytes_file, "a") as f:
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(slot_file, "a") as f:
f.write(str(slot_millis) + "\n")

# store execution time seconds
exec_time_file = os.path.join(
current_directory, test_name + ".bq_exec_time_seconds"
)
with open(exec_time_file, "a") as f:
f.write(str(exec_seconds) + "\n")
166 changes: 37 additions & 129 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

from __future__ import absolute_import

from multiprocessing import Process
import os
import pathlib
from pathlib import Path
import re
import shutil
from typing import Dict, List
Expand All @@ -42,7 +40,6 @@
"third_party",
"noxfile.py",
"setup.py",
os.path.join("scripts", "benchmark"),
]

DEFAULT_PYTHON_VERSION = "3.10"
Expand Down Expand Up @@ -686,7 +683,7 @@ def notebook(session: nox.Session):
"seaborn",
)

notebooks_list = list(Path("notebooks/").glob("*/*.ipynb"))
notebooks_list = list(pathlib.Path("notebooks/").glob("*/*.ipynb"))

denylist = [
# Regionalized testing is manually added later.
Expand All @@ -698,7 +695,7 @@ def notebook(session: nox.Session):
# With the notebooks_fill_params.py script, we are able to find and
# replace the PROJECT_ID parameter, but not the others.
#
# TODO(ashleyxu): Test these notebooks by replacing parameters with
# TODO(b/357904266): Test these notebooks by replacing parameters with
# appropriate values and omitting cleanup logic that may break
# our test infrastructure.
"notebooks/getting_started/ml_fundamentals_bq_dataframes.ipynb", # Needs DATASET.
Expand Down Expand Up @@ -748,17 +745,6 @@ def notebook(session: nox.Session):
for nb in notebooks + list(notebooks_reg):
assert os.path.exists(nb), nb

# TODO(shobs): For some reason --retries arg masks exceptions occurred in
# notebook failures, and shows unhelpful INTERNALERROR. Investigate that
# and enable retries if we can find a way to surface the real exception
# bacause the notebook is running against real GCP and something may fail
# due to transient issues.
pytest_command = [
"py.test",
"--nbmake",
"--nbmake-timeout=900", # 15 minutes
]

try:
# Populate notebook parameters and make a backup so that the notebooks
# are runnable.
Expand All @@ -767,22 +753,23 @@ def notebook(session: nox.Session):
CURRENT_DIRECTORY / "scripts" / "notebooks_fill_params.py",
*notebooks,
)

# Run notebooks in parallel session.run's, since each notebook
# takes an environment variable for performance logging
processes = []
for notebook in notebooks:
process = Process(
target=session.run,
args=(*pytest_command, notebook),
kwargs={"env": {LOGGING_NAME_ENV_VAR: os.path.basename(notebook)}},
session.run(
"python",
"scripts/run_and_publish_benchmark.py",
"--notebook",
f"--benchmark-path={notebook}",
)
process.start()
processes.append(process)

for process in processes:
process.join()

for notebook, regions in notebooks_reg.items():
for region in regions:
session.run(
"python",
"scripts/run_and_publish_benchmark.py",
"--notebook",
f"--benchmark-path={notebook}",
f"--region={region}",
)
finally:
# Prevent our notebook changes from getting checked in to git
# accidentally.
Expand All @@ -791,116 +778,37 @@ def notebook(session: nox.Session):
CURRENT_DIRECTORY / "scripts" / "notebooks_restore_from_backup.py",
*notebooks,
)

# Additionally run regionalized notebooks in parallel session.run's.
# Each notebook takes a different region via env param.
processes = []
for notebook, regions in notebooks_reg.items():
for region in regions:
process = Process(
target=session.run,
args=(*pytest_command, notebook),
kwargs={
"env": {
"BIGQUERY_LOCATION": region,
LOGGING_NAME_ENV_VAR: os.path.basename(notebook),
}
},
)
process.start()
processes.append(process)

for process in processes:
process.join()

# when the environment variable is set as it is above,
# notebooks output a .bytesprocessed and .slotmillis report
# collect those reports and print a summary
_print_performance_report("notebooks/")
session.run(
"python",
"scripts/run_and_publish_benchmark.py",
"--notebook",
"--publish-benchmarks=notebooks/",
)


@nox.session(python=DEFAULT_PYTHON_VERSION)
def benchmark(session: nox.Session):
session.install("-e", ".[all]")
base_path = os.path.join("scripts", "benchmark")

benchmark_script_list = list(Path(base_path).rglob("*.py"))
# Run benchmarks in parallel session.run's, since each benchmark
# takes an environment variable for performance logging
processes = []
for benchmark in benchmark_script_list:
process = Process(
target=session.run,
args=("python", benchmark),
kwargs={"env": {LOGGING_NAME_ENV_VAR: benchmark.as_posix()}},
)
process.start()
processes.append(process)

for process in processes:
process.join()

# when the environment variable is set as it is above,
# notebooks output a .bytesprocessed and .slotmillis report
# collect those reports and print a summary
_print_performance_report(base_path)
base_path = os.path.join("tests", "benchmark")

benchmark_script_list = list(pathlib.Path(base_path).rglob("*.py"))

def _print_performance_report(path: str):
"""Add an informational report about http queries, bytes
processed, and slot time to the testlog output for purposes
of measuring bigquery-related performance changes.

Looks specifically for output files in subfolders of the
passed path. (*/*.bytesprocessed and */*.slotmillis)
"""
print("---BIGQUERY USAGE REPORT---")
results_dict = {}
bytes_reports = sorted(Path(path).rglob("*.bytesprocessed"))
for bytes_report in bytes_reports:
with open(bytes_report, "r") as bytes_file:
filename = bytes_report.relative_to(path).with_suffix("")
lines = bytes_file.read().splitlines()
query_count = len(lines)
total_bytes = sum([int(line) for line in lines])
results_dict[filename] = [query_count, total_bytes]
os.remove(bytes_report)

millis_reports = sorted(Path(path).rglob("*.slotmillis"))
for millis_report in millis_reports:
with open(millis_report, "r") as millis_file:
filename = millis_report.relative_to(path).with_suffix("")
lines = millis_file.read().splitlines()
total_slot_millis = sum([int(line) for line in lines])
results_dict[filename] += [total_slot_millis]
os.remove(millis_report)

cumulative_queries = 0
cumulative_bytes = 0
cumulative_slot_millis = 0
for name, results in results_dict.items():
if len(results) != 3:
raise IOError(
"Mismatch in performance logging output. "
"Expected one .bytesprocessed and one .slotmillis "
"file for each notebook."
try:
for benchmark in benchmark_script_list:
if benchmark.name in ("__init__.py", "utils.py"):
continue
session.run(
"python",
"scripts/run_and_publish_benchmark.py",
f"--benchmark-path={benchmark}",
)
query_count, total_bytes, total_slot_millis = results
cumulative_queries += query_count
cumulative_bytes += total_bytes
cumulative_slot_millis += total_slot_millis
print(
f"{name} - query count: {query_count},"
f" bytes processed sum: {total_bytes},"
f" slot millis sum: {total_slot_millis}"
finally:
session.run(
"python",
"scripts/run_and_publish_benchmark.py",
f"--publish-benchmarks={base_path}",
)

print(
f"---total queries: {cumulative_queries}, "
f"total bytes: {cumulative_bytes}, "
f"total slot millis: {cumulative_slot_millis}---"
)


@nox.session(python="3.10")
def release_dry_run(session):
Expand Down
14 changes: 0 additions & 14 deletions scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q1.py

This file was deleted.

16 changes: 0 additions & 16 deletions scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q10.py

This file was deleted.

14 changes: 0 additions & 14 deletions scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q2.py

This file was deleted.

14 changes: 0 additions & 14 deletions scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q3.py

This file was deleted.

16 changes: 0 additions & 16 deletions scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q4.py

This file was deleted.

Loading