diff --git a/activitysim/abm/models/accessibility.py b/activitysim/abm/models/accessibility.py index 16d0e85597..328744374f 100644 --- a/activitysim/abm/models/accessibility.py +++ b/activitysim/abm/models/accessibility.py @@ -52,6 +52,7 @@ def compute_accessibilities_for_zones( trace_od_rows = None # merge land_use_columns into od_df + logger.info(f"{trace_label}: merge land_use_columns into od_df") od_df = pd.merge(od_df, land_use_df, left_on='dest', right_index=True).sort_index() chunk.log_df(trace_label, "od_df", od_df) @@ -69,11 +70,13 @@ def compute_accessibilities_for_zones( if network_los.zone_system == los.THREE_ZONE: locals_d['tvpb'] = network_los.tvpb + logger.info(f"{trace_label}: assign.assign_variables") results, trace_results, trace_assigned_locals \ = assign.assign_variables(assignment_spec, od_df, locals_d, trace_rows=trace_od_rows, trace_label=trace_label, chunk_log=True) chunk.log_df(trace_label, "results", results) + logger.info(f"{trace_label}: have results") # accessibility_df = accessibility_df.copy() for column in results.columns: diff --git a/activitysim/abm/models/initialize.py b/activitysim/abm/models/initialize.py index d72a88f9da..b0f05ab42a 100644 --- a/activitysim/abm/models/initialize.py +++ b/activitysim/abm/models/initialize.py @@ -184,11 +184,11 @@ def preload_injectables(): t0 = tracing.print_elapsed_time() - # FIXME - still want to do this? - # if inject.get_injectable('skim_dict', None) is not None: - # t0 = tracing.print_elapsed_time("preload skim_dict", t0, debug=True) - # - # if inject.get_injectable('skim_stack', None) is not None: - # t0 = tracing.print_elapsed_time("preload skim_stack", t0, debug=True) + if config.setting('benchmarking', False): + # we don't want to pay for skim_dict inside any model component during + # benchmarking, so we'll preload skim_dict here. Preloading is not needed + # for regular operation, as activitysim components can load-on-demand. + if inject.get_injectable('skim_dict', None) is not None: + t0 = tracing.print_elapsed_time("preload skim_dict", t0, debug=True) return True diff --git a/activitysim/abm/models/location_choice.py b/activitysim/abm/models/location_choice.py index 5b3b7d1e47..5dbafb2648 100644 --- a/activitysim/abm/models/location_choice.py +++ b/activitysim/abm/models/location_choice.py @@ -886,6 +886,10 @@ def workplace_location( # if multiprocessing.current_process().name =='mp_households_0': # raise RuntimeError(f"fake fail {process_name}") + # disable locutor for benchmarking + if config.setting('benchmarking', False): + locutor = False + iterate_location_choice( model_settings, persons_merged, persons, households, @@ -917,6 +921,10 @@ def school_location( if estimator: write_estimation_specs(estimator, model_settings, 'school_location.yaml') + # disable locutor for benchmarking + if config.setting('benchmarking', False): + locutor = False + iterate_location_choice( model_settings, persons_merged, persons, households, diff --git a/activitysim/benchmarking/__init__.py b/activitysim/benchmarking/__init__.py new file mode 100644 index 0000000000..ee674e1b35 --- /dev/null +++ b/activitysim/benchmarking/__init__.py @@ -0,0 +1 @@ +from . import componentwise diff --git a/activitysim/benchmarking/asv.conf.json b/activitysim/benchmarking/asv.conf.json new file mode 100644 index 0000000000..ca13605795 --- /dev/null +++ b/activitysim/benchmarking/asv.conf.json @@ -0,0 +1,171 @@ +{ + // The version of the config file format. Do not change, unless + // you know what you are doing. + "version": 1, + + // The name of the project being benchmarked + "project": "activitysim", + + // The project's homepage + "project_url": "https://activitysim.github.io/", + + // The URL or local path of the source code repository for the + // project being benchmarked + "repo": ".", + + // The Python project's subdirectory in your repo. If missing or + // the empty string, the project is assumed to be located at the root + // of the repository. + // "repo_subdir": "", + + // Customizable commands for building, installing, and + // uninstalling the project. See asv.conf.json documentation. + // + // "install_command": ["in-dir={env_dir} python -mpip install {wheel_file}"], + // "uninstall_command": ["return-code=any python -mpip uninstall -y {project}"], + // "build_command": [ + // "python setup.py build", + // "PIP_NO_BUILD_ISOLATION=false python -mpip wheel --no-deps --no-index -w {build_cache_dir} {build_dir}" + // ], + + // List of branches to benchmark. If not provided, defaults to "master" + // (for git) or "default" (for mercurial). + // "branches": ["master"], // for git + // "branches": ["default"], // for mercurial + + // The DVCS being used. If not set, it will be automatically + // determined from "repo" by looking at the protocol in the URL + // (if remote), or by looking for special directories, such as + // ".git" (if local). + // "dvcs": "git", + + // The tool to use to create environments. May be "conda", + // "virtualenv" or other value depending on the plugins in use. + // If missing or the empty string, the tool will be automatically + // determined by looking for tools on the PATH environment + // variable. + "environment_type": "conda", + + // timeout in seconds for installing any dependencies in environment + // defaults to 10 min + //"install_timeout": 600, + + // the base URL to show a commit for the project. + "show_commit_url": "http://github.com/ActivitySim/activitysim/commit/", + + // The Pythons you'd like to test against. If not provided, defaults + // to the current version of Python used to run `asv`. + // "pythons": ["2.7", "3.6"], + + // The list of conda channel names to be searched for benchmark + // dependency packages in the specified order + "conda_channels": ["conda-forge"], + + // The matrix of dependencies to test. Each key is the name of a + // package (in PyPI) and the values are version numbers. An empty + // list or empty string indicates to just test against the default + // (latest) version. null indicates that the package is to not be + // installed. If the package to be tested is only available from + // PyPi, and the 'environment_type' is conda, then you can preface + // the package name by 'pip+', and the package will be installed via + // pip (with all the conda available packages installed first, + // followed by the pip installed packages). + // + "matrix": { + "pyarrow": [], + "numpy": [], + "openmatrix": [], + "pandas": ["1.2"], + "pyyaml": [], + "pytables": [], + "toolz": [], + "orca": [], + "psutil": [], + "requests": [], + "numba": [], + "coverage": [], + "pytest": [], + "cytoolz": [] + }, + + // Combinations of libraries/python versions can be excluded/included + // from the set to test. Each entry is a dictionary containing additional + // key-value pairs to include/exclude. + // + // An exclude entry excludes entries where all values match. The + // values are regexps that should match the whole string. + // + // An include entry adds an environment. Only the packages listed + // are installed. The 'python' key is required. The exclude rules + // do not apply to includes. + // + // In addition to package names, the following keys are available: + // + // - python + // Python version, as in the *pythons* variable above. + // - environment_type + // Environment type, as above. + // - sys_platform + // Platform, as in sys.platform. Possible values for the common + // cases: 'linux2', 'win32', 'cygwin', 'darwin'. + // + // "exclude": [ + // {"python": "3.2", "sys_platform": "win32"}, // skip py3.2 on windows + // {"environment_type": "conda", "six": null}, // don't run without six on conda + // ], + // + // "include": [ + // // additional env for python2.7 + // {"python": "2.7", "numpy": "1.8"}, + // // additional env if run on windows+conda + // {"platform": "win32", "environment_type": "conda", "python": "2.7", "libpython": ""}, + // ], + + // The directory (relative to the current directory) that benchmarks are + // stored in. If not provided, defaults to "benchmarks" + // "benchmark_dir": "benchmarks", + + // The directory (relative to the current directory) to cache the Python + // environments in. If not provided, defaults to "env" + "env_dir": "../activitysim-asv/env", + + // The directory (relative to the current directory) that raw benchmark + // results are stored in. If not provided, defaults to "results". + "results_dir": "../activitysim-asv/results", + + // The directory (relative to the current directory) that the html tree + // should be written to. If not provided, defaults to "html". + "html_dir": "../activitysim-asv/html", + + // The number of characters to retain in the commit hashes. + // "hash_length": 8, + + // `asv` will cache results of the recent builds in each + // environment, making them faster to install next time. This is + // the number of builds to keep, per environment. + // "build_cache_size": 2, + + // The commits after which the regression search in `asv publish` + // should start looking for regressions. Dictionary whose keys are + // regexps matching to benchmark names, and values corresponding to + // the commit (exclusive) after which to start looking for + // regressions. The default is to start from the first commit + // with results. If the commit is `null`, regression detection is + // skipped for the matching benchmark. + // + // "regressions_first_commits": { + // "some_benchmark": "352cdf", // Consider regressions only after this commit + // "another_benchmark": null, // Skip regression detection altogether + // }, + + // The thresholds for relative change in results, after which `asv + // publish` starts reporting regressions. Dictionary of the same + // form as in ``regressions_first_commits``, with values + // indicating the thresholds. If multiple entries match, the + // maximum is taken. If no entry matches, the default is 5%. + // + // "regressions_thresholds": { + // "some_benchmark": 0.01, // Threshold of 1% + // "another_benchmark": 0.5, // Threshold of 50% + // }, +} diff --git a/activitysim/benchmarking/benchmarks/__init__.py b/activitysim/benchmarking/benchmarks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/activitysim/benchmarking/benchmarks/mtc1full.py b/activitysim/benchmarking/benchmarks/mtc1full.py new file mode 100644 index 0000000000..bad2dcb247 --- /dev/null +++ b/activitysim/benchmarking/benchmarks/mtc1full.py @@ -0,0 +1,83 @@ +from activitysim.benchmarking.componentwise import ( + template_component_timings, + template_setup_cache, +) + +EXAMPLE_NAME = "example_mtc_full" +CONFIGS_DIRS = ("configs",) +DYNAMIC_CONFIG_DIR = "bench_configs" +DATA_DIR = "data" +OUTPUT_DIR = "output" +COMPONENT_NAMES = [ + # "compute_accessibility", + "school_location", + "workplace_location", + "auto_ownership_simulate", + "free_parking", + "cdap_simulate", + "mandatory_tour_frequency", + "mandatory_tour_scheduling", + "joint_tour_frequency", + "joint_tour_composition", + "joint_tour_participation", + "joint_tour_destination", + "joint_tour_scheduling", + "non_mandatory_tour_frequency", + "non_mandatory_tour_destination", + "non_mandatory_tour_scheduling", + "tour_mode_choice_simulate", + "atwork_subtour_frequency", + "atwork_subtour_destination", + "atwork_subtour_scheduling", + "atwork_subtour_mode_choice", + "stop_frequency", + "trip_purpose", + "trip_destination", + "trip_purpose_and_destination", + "trip_scheduling", + "trip_mode_choice", + # "write_data_dictionary", + # "track_skim_usage", + "write_trip_matrices", + # "write_tables", +] +BENCHMARK_SETTINGS = { + "households_sample_size": 48_769, +} +SKIM_CACHE = False +PRELOAD_INJECTABLES = ("skim_dict",) +REPEAT = 1 +NUMBER = 1 +TIMEOUT = 36000.0 # ten hours +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + config_overload_dir=DYNAMIC_CONFIG_DIR, + ) + + +template_component_timings( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + (DYNAMIC_CONFIG_DIR, *CONFIGS_DIRS), + DATA_DIR, + OUTPUT_DIR, + PRELOAD_INJECTABLES, + REPEAT, + NUMBER, + TIMEOUT, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/mtc1mp4.py b/activitysim/benchmarking/benchmarks/mtc1mp4.py new file mode 100644 index 0000000000..917f730495 --- /dev/null +++ b/activitysim/benchmarking/benchmarks/mtc1mp4.py @@ -0,0 +1,81 @@ +from activitysim.benchmarking.componentwise import ( + template_setup_cache, + template_component_timings_mp, +) + +import multiprocessing +import numpy as np + +PRETTY_NAME = "MTC1_MP4" +EXAMPLE_NAME = "example_mtc_full" +NUM_PROCESSORS = int(np.clip(multiprocessing.cpu_count() - 2, 2, 4)) +CONFIGS_DIRS = ("configs_mp", "configs") +DYNAMIC_CONFIG_DIR = "bench_configs_mp" +DATA_DIR = "data" +OUTPUT_DIR = "output_mp" +COMPONENT_NAMES = [ + "school_location", + "workplace_location", + "auto_ownership_simulate", + "free_parking", + "cdap_simulate", + "mandatory_tour_frequency", + "mandatory_tour_scheduling", + "joint_tour_frequency", + "joint_tour_composition", + "joint_tour_participation", + "joint_tour_destination", + "joint_tour_scheduling", + "non_mandatory_tour_frequency", + "non_mandatory_tour_destination", + "non_mandatory_tour_scheduling", + "tour_mode_choice_simulate", + "atwork_subtour_frequency", + "atwork_subtour_destination", + "atwork_subtour_scheduling", + "atwork_subtour_mode_choice", + "stop_frequency", + "trip_purpose", + "trip_destination", + "trip_purpose_and_destination", + "trip_scheduling", + "trip_mode_choice", +] +BENCHMARK_SETTINGS = { + # TODO: This multiprocess benchmarking is minimally functional, + # but has a bad habit of crashing due to memory allocation errors on + # all but the tiniest of examples. It would be great to fix the MP + # benchmarks so they use chunking, automatically configure for available + # RAM, and run a training-production cycle to get useful timing results. + "households_sample_size": 400, + "num_processes": NUM_PROCESSORS, +} +SKIM_CACHE = False +TIMEOUT = 36000.0 # ten hours +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + config_overload_dir=DYNAMIC_CONFIG_DIR, + ) + + +template_component_timings_mp( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + OUTPUT_DIR, + PRETTY_NAME, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/sandag1example.py b/activitysim/benchmarking/benchmarks/sandag1example.py new file mode 100644 index 0000000000..fb5f45bf2c --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag1example.py @@ -0,0 +1,43 @@ +from activitysim.benchmarking.componentwise import ( + template_component_timings, + template_setup_cache, +) + +from .sandag_example import * + +EXAMPLE_NAME = "example_sandag_1_zone" +CONFIGS_DIRS = ("configs_1_zone", "example_mtc/configs") +DYNAMIC_CONFIG_DIR = "bench_configs" +DATA_DIR = "data_1" +OUTPUT_DIR = "output_1" +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + ) + + +template_component_timings( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + (DYNAMIC_CONFIG_DIR, *CONFIGS_DIRS), + DATA_DIR, + OUTPUT_DIR, + PRELOAD_INJECTABLES, + REPEAT, + NUMBER, + TIMEOUT, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/sandag1full.py b/activitysim/benchmarking/benchmarks/sandag1full.py new file mode 100644 index 0000000000..b892d2771e --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag1full.py @@ -0,0 +1,43 @@ +from activitysim.benchmarking.componentwise import ( + template_component_timings, + template_setup_cache, +) + +from .sandag_full import * + +EXAMPLE_NAME = "example_sandag_1_zone_full" +CONFIGS_DIRS = ("configs_benchmarking", "configs_1_zone", "example_mtc/configs") +DYNAMIC_CONFIG_DIR = "bench_configs" +DATA_DIR = "data_1" +OUTPUT_DIR = "output_1" +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + ) + + +template_component_timings( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + (DYNAMIC_CONFIG_DIR, *CONFIGS_DIRS), + DATA_DIR, + OUTPUT_DIR, + PRELOAD_INJECTABLES, + REPEAT, + NUMBER, + TIMEOUT, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/sandag2example.py b/activitysim/benchmarking/benchmarks/sandag2example.py new file mode 100644 index 0000000000..9903d0ec0e --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag2example.py @@ -0,0 +1,43 @@ +from activitysim.benchmarking.componentwise import ( + template_component_timings, + template_setup_cache, +) + +from .sandag_example import * + +EXAMPLE_NAME = "example_sandag_2_zone" +CONFIGS_DIRS = ("configs_2_zone", "example_psrc/configs") +DYNAMIC_CONFIG_DIR = "bench_configs" +DATA_DIR = "data_2" +OUTPUT_DIR = "output_2" +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + ) + + +template_component_timings( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + (DYNAMIC_CONFIG_DIR, *CONFIGS_DIRS), + DATA_DIR, + OUTPUT_DIR, + PRELOAD_INJECTABLES, + REPEAT, + NUMBER, + TIMEOUT, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/sandag2full.py b/activitysim/benchmarking/benchmarks/sandag2full.py new file mode 100644 index 0000000000..974cb3f219 --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag2full.py @@ -0,0 +1,43 @@ +from activitysim.benchmarking.componentwise import ( + template_component_timings, + template_setup_cache, +) + +from .sandag_full import * + +EXAMPLE_NAME = "example_sandag_2_zone_full" +CONFIGS_DIRS = ("configs_benchmarking", "configs_2_zone", "example_psrc/configs") +DYNAMIC_CONFIG_DIR = "bench_configs" +DATA_DIR = "data_2" +OUTPUT_DIR = "output_2" +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + ) + + +template_component_timings( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + (DYNAMIC_CONFIG_DIR, *CONFIGS_DIRS), + DATA_DIR, + OUTPUT_DIR, + PRELOAD_INJECTABLES, + REPEAT, + NUMBER, + TIMEOUT, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/sandag3example.py b/activitysim/benchmarking/benchmarks/sandag3example.py new file mode 100644 index 0000000000..aa040b1448 --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag3example.py @@ -0,0 +1,43 @@ +from activitysim.benchmarking.componentwise import ( + template_component_timings, + template_setup_cache, +) + +from .sandag_example import * + +EXAMPLE_NAME = "example_sandag_3_zone" +CONFIGS_DIRS = ("configs_3_zone", "example_mtc/configs") +DYNAMIC_CONFIG_DIR = "bench_configs" +DATA_DIR = "data_3" +OUTPUT_DIR = "output_3" +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + ) + + +template_component_timings( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + (DYNAMIC_CONFIG_DIR, *CONFIGS_DIRS), + DATA_DIR, + OUTPUT_DIR, + PRELOAD_INJECTABLES, + REPEAT, + NUMBER, + TIMEOUT, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/sandag3full.py b/activitysim/benchmarking/benchmarks/sandag3full.py new file mode 100644 index 0000000000..727f66a32b --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag3full.py @@ -0,0 +1,43 @@ +from activitysim.benchmarking.componentwise import ( + template_component_timings, + template_setup_cache, +) + +from .sandag_full import * + +EXAMPLE_NAME = "example_sandag_3_zone_full" +CONFIGS_DIRS = ("configs_benchmarking", "configs_3_zone", "example_mtc/configs") +DYNAMIC_CONFIG_DIR = "bench_configs" +DATA_DIR = "data_3" +OUTPUT_DIR = "output_3" +VERSION = "1" + + +def setup_cache(): + template_setup_cache( + EXAMPLE_NAME, + COMPONENT_NAMES, + BENCHMARK_SETTINGS, + dict( + read_skim_cache=SKIM_CACHE, + write_skim_cache=SKIM_CACHE, + ), + CONFIGS_DIRS, + DATA_DIR, + OUTPUT_DIR, + ) + + +template_component_timings( + globals(), + COMPONENT_NAMES, + EXAMPLE_NAME, + (DYNAMIC_CONFIG_DIR, *CONFIGS_DIRS), + DATA_DIR, + OUTPUT_DIR, + PRELOAD_INJECTABLES, + REPEAT, + NUMBER, + TIMEOUT, + VERSION, +) diff --git a/activitysim/benchmarking/benchmarks/sandag_example.py b/activitysim/benchmarking/benchmarks/sandag_example.py new file mode 100644 index 0000000000..b9c74c86eb --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag_example.py @@ -0,0 +1,41 @@ +COMPONENT_NAMES = [ + # "compute_accessibility", + "school_location", + "workplace_location", + "auto_ownership_simulate", + "free_parking", + "cdap_simulate", + "mandatory_tour_frequency", + "mandatory_tour_scheduling", + "joint_tour_frequency", + "joint_tour_composition", + "joint_tour_participation", + "joint_tour_destination", + "joint_tour_scheduling", + "non_mandatory_tour_frequency", + "non_mandatory_tour_destination", + "non_mandatory_tour_scheduling", + "tour_mode_choice_simulate", + "atwork_subtour_frequency", + "atwork_subtour_destination", + "atwork_subtour_scheduling", + "atwork_subtour_mode_choice", + "stop_frequency", + "trip_purpose", + "trip_destination", + "trip_purpose_and_destination", + "trip_scheduling", + "trip_mode_choice", + # "write_data_dictionary", + # "track_skim_usage", + "write_trip_matrices", + # "write_tables", +] +BENCHMARK_SETTINGS = { + "households_sample_size": 48_769, +} +SKIM_CACHE = True +PRELOAD_INJECTABLES = ("skim_dict",) +REPEAT = 1 +NUMBER = 1 +TIMEOUT = 36000.0 # ten hours diff --git a/activitysim/benchmarking/benchmarks/sandag_full.py b/activitysim/benchmarking/benchmarks/sandag_full.py new file mode 100644 index 0000000000..51710a1af7 --- /dev/null +++ b/activitysim/benchmarking/benchmarks/sandag_full.py @@ -0,0 +1,41 @@ +COMPONENT_NAMES = [ + # "compute_accessibility", + "school_location", + "workplace_location", + "auto_ownership_simulate", + "free_parking", + "cdap_simulate", + "mandatory_tour_frequency", + "mandatory_tour_scheduling", + "joint_tour_frequency", + "joint_tour_composition", + "joint_tour_participation", + "joint_tour_destination", + "joint_tour_scheduling", + "non_mandatory_tour_frequency", + "non_mandatory_tour_destination", + "non_mandatory_tour_scheduling", + "tour_mode_choice_simulate", + "atwork_subtour_frequency", + "atwork_subtour_destination", + "atwork_subtour_scheduling", + "atwork_subtour_mode_choice", + "stop_frequency", + "trip_purpose", + "trip_destination", + "trip_purpose_and_destination", + "trip_scheduling", + "trip_mode_choice", + # "write_data_dictionary", + # "track_skim_usage", + "write_trip_matrices", + # "write_tables", +] +BENCHMARK_SETTINGS = { + "households_sample_size": 48_769, # match hh sample size in example data +} +SKIM_CACHE = False +PRELOAD_INJECTABLES = ("skim_dict",) +REPEAT = 1 +NUMBER = 1 +TIMEOUT = 36000.0 # ten hours diff --git a/activitysim/benchmarking/componentwise.py b/activitysim/benchmarking/componentwise.py new file mode 100644 index 0000000000..171a7369de --- /dev/null +++ b/activitysim/benchmarking/componentwise.py @@ -0,0 +1,701 @@ +import glob +import os +import logging +import logging.handlers +import numpy as np +import pandas as pd +import yaml +import traceback + +from ..cli.create import get_example +from ..core.pipeline import open_pipeline, run_model +from ..core import inject, tracing +from ..cli.run import config, pipeline, INJECTABLES +from . import workspace + +logger = logging.getLogger(__name__) + + +def reload_settings(settings_filename, **kwargs): + settings = config.read_settings_file(settings_filename, mandatory=True) + for k in kwargs: + settings[k] = kwargs[k] + inject.add_injectable("settings", settings) + return settings + + +def component_logging(component_name): + root_logger = logging.getLogger() + + CLOG_FMT = "%(asctime)s %(levelname)7s - %(name)s: %(message)s" + + logfilename = config.log_file_path(f"asv-{component_name}.log") + + # avoid creation of multiple file handlers for logging components + # as we will re-enter this function for every component run + for entry in root_logger.handlers: + if (isinstance(entry, logging.handlers.RotatingFileHandler)) and ( + entry.formatter._fmt == CLOG_FMT + ): + return + + tracing.config_logger(basic=True) + file_handler = logging.handlers.RotatingFileHandler( + filename=logfilename, + mode="a", + maxBytes=50_000_000, + backupCount=5, + ) + formatter = logging.Formatter( + fmt=CLOG_FMT, + datefmt="%Y-%m-%d %H:%M:%S", + ) + file_handler.setFormatter(formatter) + logging.getLogger().addHandler(file_handler) + + +def setup_component( + component_name, + working_dir=".", + preload_injectables=(), + configs_dirs=("configs"), + data_dir="data", + output_dir="output", + settings_filename="settings.yaml", +): + """ + Prepare to benchmark a model component. + + This function sets up everything, opens the pipeline, and + reloads table state from checkpoints of prior components. + All this happens here, before the model component itself + is actually executed inside the timed portion of the loop. + """ + if isinstance(configs_dirs, str): + configs_dirs = [configs_dirs] + inject.add_injectable( + "configs_dir", [os.path.join(working_dir, i) for i in configs_dirs] + ) + inject.add_injectable("data_dir", os.path.join(working_dir, data_dir)) + inject.add_injectable("output_dir", os.path.join(working_dir, output_dir)) + + reload_settings( + settings_filename, + benchmarking=component_name, + checkpoints=False, + ) + + component_logging(component_name) + logger.info("connected to component logger") + config.filter_warnings() + logging.captureWarnings(capture=True) + + # register abm steps and other abm-specific injectables outside of + # benchmark timing loop + if not inject.is_injectable("preload_injectables"): + logger.info("preload_injectables yes import") + from activitysim import abm + else: + logger.info("preload_injectables no import") + + # Extract the resume_after argument based on the model immediately + # prior to the component being benchmarked. + models = config.setting("models") + try: + component_index = models.index(component_name) + except ValueError: + # the last component to be benchmarked isn't included in the + # pre-checkpointed model list, we just resume from the end + component_index = len(models) + if component_index: + resume_after = models[component_index - 1] + else: + resume_after = None + + if config.setting("multiprocess", False): + raise NotImplementedError( + "multiprocess component benchmarking is not yet implemented" + ) + # Component level timings for multiprocess benchmarking + # are not generated using this code that re-runs individual + # components. Instead, those benchmarks are generated in + # aggregate during setup and then extracted from logs later. + else: + open_pipeline(resume_after, mode="r") + + for k in preload_injectables: + if inject.get_injectable(k, None) is not None: + logger.info("pre-loaded %s", k) + + # Directories Logging + for k in ["configs_dir", "settings_file_name", "data_dir", "output_dir"]: + logger.info(f"DIRECTORY {k}: {inject.get_injectable(k, None)}") + + # Settings Logging + log_settings = [ + "checkpoints", + "chunk_training_mode", + "chunk_size", + "chunk_method", + "trace_hh_id", + "households_sample_size", + "check_for_variability", + "use_shadow_pricing", + "want_dest_choice_sample_tables", + "log_alt_losers", + ] + for k in log_settings: + logger.info(f"SETTING {k}: {config.setting(k)}") + + logger.info("setup_component completed: %s", component_name) + + +def run_component(component_name): + logger.info("run_component: %s", component_name) + try: + if config.setting("multiprocess", False): + raise NotImplementedError( + "multiprocess component benchmarking is not yet implemented" + ) + # Component level timings for multiprocess benchmarking + # are not generated using this code that re-runs individual + # components. Instead, those benchmarks are generated in + # aggregate during setup and then extracted from logs later. + else: + run_model(component_name) + except Exception as err: + logger.exception("run_component exception: %s", component_name) + raise + else: + logger.info("run_component completed: %s", component_name) + return 0 + + +def teardown_component(component_name): + logger.info("teardown_component: %s", component_name) + + # use the pipeline module to clear out all the orca tables, so + # the next benchmark run has a clean slate. + # anything needed should be reloaded from the pipeline checkpoint file + pipeline_tables = pipeline.registered_tables() + for table_name in pipeline_tables: + logger.info("dropping table %s", table_name) + pipeline.drop_table(table_name) + + if config.setting("multiprocess", False): + raise NotImplementedError("multiprocess benchmarking is not yet implemented") + else: + pipeline.close_pipeline() + logger.critical( + "teardown_component completed: %s\n\n%s\n\n", component_name, "~" * 88 + ) + return 0 + + +def pre_run( + model_working_dir, + configs_dirs=None, + data_dir="data", + output_dir="output", + settings_file_name=None, +): + """ + Pre-run the models, checkpointing everything. + + By checkpointing everything, it is possible to run each benchmark + by recreating the state of the pipeline immediately prior to that + component. + + Parameters + ---------- + model_working_dir : str + Path to the model working directory, generally inside the + benchmarking workspace. + configs_dirs : Iterable[str], optional + Override the config dirs, similar to using -c on the command line + for a model run. + data_dir : str, optional + Override the data directory similar to using -d on the command line + for a model run. + output_dir : str, optional + Override the output directory similar to using -o on the command line + for a model run. + settings_file_name : str, optional + Override the settings file name, similar to using -s on the command line + for a model run. + """ + if configs_dirs is None: + inject.add_injectable("configs_dir", os.path.join(model_working_dir, "configs")) + else: + configs_dirs_ = [os.path.join(model_working_dir, i) for i in configs_dirs] + inject.add_injectable("configs_dir", configs_dirs_) + inject.add_injectable("data_dir", os.path.join(model_working_dir, data_dir)) + inject.add_injectable("output_dir", os.path.join(model_working_dir, output_dir)) + + if settings_file_name is not None: + inject.add_injectable("settings_file_name", settings_file_name) + + # Always pre_run from the beginning + config.override_setting("resume_after", None) + + # register abm steps and other abm-specific injectables + if not inject.is_injectable("preload_injectables"): + from activitysim import ( + abm, + ) # register abm steps and other abm-specific injectables + + if settings_file_name is not None: + inject.add_injectable("settings_file_name", settings_file_name) + + # cleanup + # cleanup_output_files() + + tracing.config_logger(basic=False) + config.filter_warnings() + logging.captureWarnings(capture=True) + + # directories + for k in ["configs_dir", "settings_file_name", "data_dir", "output_dir"]: + logger.info("SETTING %s: %s" % (k, inject.get_injectable(k, None))) + + log_settings = inject.get_injectable("log_settings", {}) + for k in log_settings: + logger.info("SETTING %s: %s" % (k, config.setting(k))) + + # OMP_NUM_THREADS: openmp + # OPENBLAS_NUM_THREADS: openblas + # MKL_NUM_THREADS: mkl + for env in ["MKL_NUM_THREADS", "OMP_NUM_THREADS", "OPENBLAS_NUM_THREADS"]: + logger.info(f"ENV {env}: {os.getenv(env)}") + + np_info_keys = [ + "atlas_blas_info", + "atlas_blas_threads_info", + "atlas_info", + "atlas_threads_info", + "blas_info", + "blas_mkl_info", + "blas_opt_info", + "lapack_info", + "lapack_mkl_info", + "lapack_opt_info", + "mkl_info", + ] + + for cfg_key in np_info_keys: + info = np.__config__.get_info(cfg_key) + if info: + for info_key in ["libraries"]: + if info_key in info: + logger.info(f"NUMPY {cfg_key} {info_key}: {info[info_key]}") + + t0 = tracing.print_elapsed_time() + + logger.info(f"MODELS: {config.setting('models')}") + + if config.setting("multiprocess", False): + logger.info("run multi-process complete simulation") + else: + logger.info("run single process simulation") + pipeline.run(models=config.setting("models")) + pipeline.close_pipeline() + + tracing.print_elapsed_time("prerun required models for checkpointing", t0) + + return 0 + + +def run_multiprocess(): + logger.info("run multiprocess simulation") + tracing.delete_trace_files() + tracing.delete_output_files("h5") + tracing.delete_output_files("csv") + tracing.delete_output_files("txt") + tracing.delete_output_files("yaml") + tracing.delete_output_files("prof") + tracing.delete_output_files("omx") + + from activitysim.core import mp_tasks + + injectables = {k: inject.get_injectable(k) for k in INJECTABLES} + mp_tasks.run_multiprocess(injectables) + + assert not pipeline.is_open() + + if config.setting("cleanup_pipeline_after_run", False): + pipeline.cleanup_pipeline() + + +######## + + +def local_dir(): + benchmarking_directory = workspace.get_dir() + if benchmarking_directory is not None: + return benchmarking_directory + return os.getcwd() + + +def model_dir(*subdirs): + return os.path.join(local_dir(), "models", *subdirs) + + +def template_setup_cache( + example_name, + component_names, + benchmark_settings, + benchmark_network_los, + config_dirs=("configs",), + data_dir="data", + output_dir="output", + settings_filename="settings.yaml", + skip_component_names=None, + config_overload_dir="dynamic_configs", +): + """ + Prepare an example model for benchmarking. + + The algorithm for benchmarking components requires an existing pipeline + file with checkpoints after every component, which allows the code to + recreate the state of pipeline tables immediately prior to each component's + execution. The checkpoints file is very large, and can be recreated + by running the model, so it is not stored/downloaded but just rebuilt as + needed. + + This template function creates that pipeline store if it does not exist by + getting the example model and running once through from the beginning through + the last component to be benchmarked. After this is done, a token is written + out to `benchmark-setup-token.txt` in the output directory, flagging that + the complete checkpointed pipeline has been created without errors and is + ready to use. If the template setup finds this token file, all this work + is assumed to be already done and is skipped. + + Parameters + ---------- + example_name : str + The name of the example to benchmark, as used in + the `activitysim create` command. + component_names : Sequence + The names of the model components to be individually + benchmarked. This list does not need to include all + the components usually included in the example. + benchmark_settings : Mapping + Settings values to override from their usual values + in the example. + benchmark_network_los : Mapping + Network LOS values to override from their usual values + in the example. + config_dirs : Sequence + data_dir : str + output_dir : str + settings_filename : str + skip_component_names : Sequence, optional + Skip running these components when setting up the + benchmarks (i.e. in pre-run). + config_overload_dir : str, default 'dynamic_configs' + """ + try: + os.makedirs(model_dir(), exist_ok=True) + get_example( + example_name=example_name, + destination=model_dir(), + benchmarking=True, + ) + os.makedirs(model_dir(example_name, config_overload_dir), exist_ok=True) + + # Find the settings file and extract the complete set of models included + from ..core.config import read_settings_file + + try: + existing_settings, settings_filenames = read_settings_file( + settings_filename, + mandatory=True, + include_stack=True, + configs_dir_list=[model_dir(example_name, c) for c in config_dirs], + ) + except Exception: + logger.error(f"os.getcwd:{os.getcwd()}") + raise + if "models" not in existing_settings: + raise ValueError( + f"missing list of models from {config_dirs}/{settings_filename}" + ) + models = existing_settings["models"] + use_multiprocess = existing_settings.get("multiprocess", False) + for k in existing_settings: + print(f"existing_settings {k}:", existing_settings[k]) + + settings_changes = dict( + benchmarking=True, + checkpoints=True, + trace_hh_id=None, + chunk_training_mode="disabled", + inherit_settings=True, + ) + settings_changes.update(benchmark_settings) + + # Pre-run checkpointing or Multiprocess timing runs only need to + # include models up to the penultimate component to be benchmarked. + last_component_to_benchmark = 0 + for cname in component_names: + try: + last_component_to_benchmark = max( + models.index(cname), last_component_to_benchmark + ) + except ValueError: + if cname not in models: + logger.warning( + f"want to benchmark {example_name}.{cname} but it is not in the list of models to run" + ) + else: + raise + if use_multiprocess: + last_component_to_benchmark += 1 + pre_run_model_list = models[:last_component_to_benchmark] + if skip_component_names is not None: + for cname in skip_component_names: + if cname in pre_run_model_list: + pre_run_model_list.remove(cname) + settings_changes["models"] = pre_run_model_list + + if "multiprocess_steps" in existing_settings: + multiprocess_steps = existing_settings["multiprocess_steps"] + while ( + multiprocess_steps[-1].get("begin", "missing-begin") + not in pre_run_model_list + ): + multiprocess_steps = multiprocess_steps[:-1] + if len(multiprocess_steps) == 0: + break + settings_changes["multiprocess_steps"] = multiprocess_steps + + with open( + model_dir(example_name, config_overload_dir, settings_filename), "wt" + ) as yf: + try: + yaml.safe_dump(settings_changes, yf) + except Exception: + logger.error(f"settings_changes:{str(settings_changes)}") + logger.exception("oops") + raise + with open( + model_dir(example_name, config_overload_dir, "network_los.yaml"), "wt" + ) as yf: + benchmark_network_los["inherit_settings"] = True + yaml.safe_dump(benchmark_network_los, yf) + + os.makedirs(model_dir(example_name, output_dir), exist_ok=True) + + # Running the model through all the steps and checkpointing everywhere is + # expensive and only needs to be run once. Once it is done we will write + # out a completion token file to indicate to future benchmark attempts + # that this does not need to be repeated. Developers should manually + # delete the token (or the whole model file) when a structural change + # in the model happens such that re-checkpointing is needed (this should + # happen rarely). + use_config_dirs = (config_overload_dir, *config_dirs) + token_file = model_dir(example_name, output_dir, "benchmark-setup-token.txt") + if not os.path.exists(token_file) and not use_multiprocess: + try: + pre_run( + model_dir(example_name), + use_config_dirs, + data_dir, + output_dir, + settings_filename, + ) + except Exception as err: + with open( + model_dir(example_name, output_dir, "benchmark-setup-error.txt"), + "wt", + ) as f: + f.write(f"error {err}") + f.write(traceback.format_exc()) + raise + else: + with open(token_file, "wt") as f: + # We write the commit into the token, in case that is useful + # to developers to decide if the checkpointed pipeline is + # out of date. + asv_commit = os.environ.get("ASV_COMMIT", "ASV_COMMIT_UNKNOWN") + f.write(asv_commit) + if use_multiprocess: + # Multiprocessing timing runs are actually fully completed within + # the setup_cache step, and component-level timings are written out + # to log files by activitysim during this run. + asv_commit = os.environ.get("ASV_COMMIT", "ASV_COMMIT_UNKNOWN") + try: + pre_run( + model_dir(example_name), + use_config_dirs, + data_dir, + output_dir, + settings_filename, + ) + run_multiprocess() + except Exception as err: + with open( + model_dir( + example_name, output_dir, f"-mp-run-error-{asv_commit}.txt" + ), + "wt", + ) as f: + f.write(f"error {err}") + f.write(traceback.format_exc()) + raise + + except Exception as err: + logger.error( + f"error in template_setup_cache({example_name}):\n" + traceback.format_exc() + ) + raise + + +def template_component_timings( + module_globals, + component_names, + example_name, + config_dirs, + data_dir, + output_dir, + preload_injectables, + repeat_=(1, 20, 10.0), # min_repeat, max_repeat, max_time_seconds + number_=1, + timeout_=36000.0, # ten hours, + version_="1", +): + """ + Inject ComponentTiming classes into a module namespace for benchmarking a model. + + Arguments with a trailing underscore get passed through to airspeed velocity, see + https://asv.readthedocs.io/en/stable/benchmarks.html?highlight=repeat#timing-benchmarks + for more info on these. + + Parameters + ---------- + module_globals : Mapping + The module globals namespace, into which the timing classes are written. + component_names : Iterable[str] + Names of components to benchmark. + example_name : str + Name of the example model being benchmarked, as it appears in the + exammple_manifest.yaml file. + config_dirs : Tuple[str] + Config directories to use when running the model being benchmarked. + data_dir, output_dir : str + Data and output directories to use when running the model being + benchmarked. + preload_injectables : Tuple[str] + Names of injectables to pre-load (typically skims). + repeat_ : tuple + The values for (min_repeat, max_repeat, max_time_seconds). See ASV docs + for more information. + number_ : int, default 1 + The number of iterations in each sample. Generally this should stay + set to 1 for ActivitySim timing. + timeout_ : number, default 36000.0, + How many seconds before the benchmark is assumed to have crashed. The + typical default for airspeed velocity is 60 but that is wayyyyy too short for + ActivitySim, so the default here is set to ten hours. + version_ : str + Used to determine when to invalidate old benchmark results. Benchmark results + produced with a different value of the version than the current value will be + ignored. + """ + + for componentname in component_names: + + class ComponentTiming: + component_name = componentname + warmup_time = 0 + min_run_count = 1 + processes = 1 + repeat = repeat_ + number = number_ + timeout = timeout_ + + def setup(self): + setup_component( + self.component_name, + model_dir(example_name), + preload_injectables, + config_dirs, + data_dir, + output_dir, + ) + + def teardown(self): + teardown_component(self.component_name) + + def time_component(self): + run_component(self.component_name) + + time_component.pretty_name = f"{example_name}:{componentname}" + time_component.version = version_ + + ComponentTiming.__name__ = f"{componentname}" + + module_globals[componentname] = ComponentTiming + + +def template_component_timings_mp( + module_globals, + component_names, + example_name, + output_dir, + pretty_name, + version_="1", +): + """ + Inject ComponentTiming classes into a module namespace for benchmarking a model. + + This "MP" version for multiprocessing doesn't actually measure the time taken, + but instead it parses the run logs from a single full run of the mode, to + extract the per-component timings. Most of the configurability has been removed + compared to the single-process version of this function. + + Parameters + ---------- + module_globals : Mapping + The module globals namespace, into which the timing classes are written. + component_names : Iterable[str] + Names of components to benchmark. + example_name : str + Name of the example model being benchmarked, as it appears in the + exammple_manifest.yaml file. + output_dir : str + Output directory to use when running the model being benchmarked. + pretty_name : str + A "pretty" name for this set of benchmarks. + version_ : str + Used to determine when to invalidate old benchmark results. Benchmark results + produced with a different value of the version than the current value will be + ignored. + """ + + for componentname in component_names: + + class ComponentTiming: + component_name = componentname + + def track_component(self): + durations = [] + inject.add_injectable("output_dir", model_dir(example_name, output_dir)) + logfiler = config.log_file_path(f"timing_log.mp_households_*.csv") + for logfile in glob.glob(logfiler): + df = pd.read_csv(logfile) + dfq = df.query(f"component_name=='{self.component_name}'") + if len(dfq): + durations.append(dfq.iloc[-1].duration) + if len(durations): + return np.mean(durations) + else: + raise ValueError("no results available") + + track_component.pretty_name = f"{pretty_name}:{componentname}" + track_component.version = version_ + track_component.unit = "s" + + ComponentTiming.__name__ = f"{componentname}" + + module_globals[componentname] = ComponentTiming diff --git a/activitysim/benchmarking/latest.py b/activitysim/benchmarking/latest.py new file mode 100644 index 0000000000..a8b02e8ca4 --- /dev/null +++ b/activitysim/benchmarking/latest.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, division, print_function, unicode_literals + +import logging +import subprocess +import traceback +import shlex +from asv.console import log +from asv import util +from asv.commands.run import Run +from asv.commands import common_args + + +def _do_build(args): + env, conf, repo, commit_hash = args + try: + with log.set_level(logging.WARN): + env.install_project(conf, repo, commit_hash) + except util.ProcessError: + return (env.name, False) + return (env.name, True) + + +def _do_build_multiprocess(args): + """ + multiprocessing callback to build the project in one particular + environment. + """ + try: + return _do_build(args) + except BaseException as exc: + raise util.ParallelFailure(str(exc), exc.__class__, traceback.format_exc()) + + +class Latest(Run): + @classmethod + def setup_arguments(cls, subparsers): + parser = subparsers.add_parser( + "latest", + help="Run a benchmark suite on the HEAD commit", + description="Run a benchmark suite.", + ) + + common_args.add_bench(parser) + parser.add_argument( + "--profile", + "-p", + action="store_true", + help="""In addition to timing, run the benchmarks through + the `cProfile` profiler and store the results.""", + ) + common_args.add_parallel(parser) + common_args.add_show_stderr(parser) + parser.add_argument( + "--quick", + "-q", + action="store_true", + help="""Do a "quick" run, where each benchmark function is + run only once. This is useful to find basic errors in the + benchmark functions faster. The results are unlikely to + be useful, and thus are not saved.""", + ) + common_args.add_environment(parser) + parser.add_argument( + "--set-commit-hash", + default=None, + help="""Set the commit hash to use when recording benchmark + results. This makes results to be saved also when using an + existing environment.""", + ) + common_args.add_launch_method(parser) + parser.add_argument( + "--dry-run", + "-n", + action="store_true", + default=None, + help="""Do not save any results to disk.""", + ) + common_args.add_machine(parser) + parser.add_argument( + "--skip-existing-successful", + action="store_true", + help="""Skip running benchmarks that have previous successful + results""", + ) + parser.add_argument( + "--skip-existing-failed", + action="store_true", + help="""Skip running benchmarks that have previous failed + results""", + ) + parser.add_argument( + "--skip-existing-commits", + action="store_true", + help="""Skip running benchmarks for commits that have existing + results""", + ) + parser.add_argument( + "--skip-existing", + "-k", + action="store_true", + help="""Skip running benchmarks that have previous successful + or failed results""", + ) + parser.add_argument( + "--interleave-processes", + action="store_true", + default=False, + help="""Interleave benchmarks with multiple processes across + commits. This can avoid measurement biases from commit ordering, + can take longer.""", + ) + parser.add_argument( + "--no-interleave-processes", + action="store_false", + dest="interleave_processes", + ) + parser.add_argument( + "--no-pull", action="store_true", help="Do not pull the repository" + ) + + parser.set_defaults(func=cls.run_from_args) + + return parser + + @classmethod + def run_from_conf_args(cls, conf, args, **kwargs): + return cls.run( + conf=conf, + range_spec="HEAD^!", + steps=None, + bench=args.bench, + attribute=args.attribute, + parallel=args.parallel, + show_stderr=args.show_stderr, + quick=args.quick, + profile=args.profile, + env_spec=args.env_spec, + set_commit_hash=args.set_commit_hash, + dry_run=args.dry_run, + machine=args.machine, + skip_successful=args.skip_existing_successful or args.skip_existing, + skip_failed=args.skip_existing_failed or args.skip_existing, + skip_existing_commits=args.skip_existing_commits, + record_samples=True, + append_samples=True, + pull=not args.no_pull, + interleave_processes=args.interleave_processes, + launch_method=args.launch_method, + **kwargs + ) + + +class Batch(Run): + @classmethod + def setup_arguments(cls, subparsers): + parser = subparsers.add_parser( + "batch", + help="Run a set of benchmark suites based on a batch file. " + "Simply give the file name, which should be a text file " + "containing a number of activitysim benchmark commands.", + description="Run a set of benchmark suites based on a batch file.", + ) + + parser.add_argument( + "file", + action="store", + type=str, + help="""Set the file name to use for reading multiple commands.""", + ) + + parser.set_defaults(func=cls.run_from_args) + + return parser + + @classmethod + def run_from_conf_args(cls, conf, args, **kwargs): + with open(args.file, "rt") as f: + for line in f.readlines(): + subprocess.run(["activitysim", "benchmark", *shlex.split(line)]) diff --git a/activitysim/benchmarking/profile_inspector.py b/activitysim/benchmarking/profile_inspector.py new file mode 100644 index 0000000000..84e9549846 --- /dev/null +++ b/activitysim/benchmarking/profile_inspector.py @@ -0,0 +1,106 @@ +import base64 +import contextlib +import io +import json +import os +import tempfile +import zlib +import traceback +from asv.plugins.snakeviz import SnakevizGui +from asv.commands import Command +from asv.console import log + + +def benchmark_snakeviz(json_record, benchmark=None): + """ + A utility to directly display saved profiling data in Snakeviz. + + Parameters + ---------- + json_record : Path-like + The archived json file that contains profile data for benchmarks. + benchmark : str, optional + The name of the benchmark to display. + """ + from asv import util + + with open(json_record, "rt") as f: + json_content = json.load(f) + profiles = json_content.get("profiles", {}) + if benchmark is None or benchmark not in profiles: + if profiles: + log.info("\n\nAvailable profiles:") + for k in profiles.keys(): + log.info(f"- {k}") + else: + log.info(f"\n\nNo profiles stored in {json_record}") + if benchmark is None: + return + raise KeyError() + profile_data = zlib.decompress(base64.b64decode(profiles[benchmark].encode())) + prefix = benchmark.replace(".", "__") + "." + with temp_profile(profile_data, prefix) as profile_path: + log.info(f"Profiling data cached to {profile_path}") + import pstats + + prof = pstats.Stats(profile_path) + prof.strip_dirs().dump_stats(profile_path + "b") + try: + SnakevizGui.open_profiler_gui(profile_path + "b") + except KeyboardInterrupt: + pass + except Exception: + + traceback.print_exc() + input(input("Press Enter to continue...")) + finally: + os.remove(profile_path + "b") + + +@contextlib.contextmanager +def temp_profile(profile_data, prefix=None): + profile_fd, profile_path = tempfile.mkstemp(prefix=prefix) + try: + with io.open(profile_fd, "wb", closefd=True) as fd: + fd.write(profile_data) + + yield profile_path + finally: + os.remove(profile_path) + + +class ProfileInspector(Command): + @classmethod + def setup_arguments(cls, subparsers): + parser = subparsers.add_parser( + "snakeviz", + help="""Run snakeviz on a particular benchmark that has been profiled.""", + description="Inspect a benchmark profile", + ) + + parser.add_argument( + "json_record", + help="""The json file in the benchmark results to read profile data from.""", + ) + + parser.add_argument( + "benchmark", + help="""The benchmark to profile. Must be a + fully-specified benchmark name. For parameterized benchmark, it + must include the parameter combination to use, e.g.: + benchmark_name(param0, param1, ...)""", + default=None, + nargs="?", + ) + + parser.set_defaults(func=cls.run_from_args) + + return parser + + @classmethod + def run_from_conf_args(cls, conf, args, **kwargs): + return cls.run(json_record=args.json_record, benchmark=args.benchmark) + + @classmethod + def run(cls, json_record, benchmark): + benchmark_snakeviz(json_record, benchmark) diff --git a/activitysim/benchmarking/reader.py b/activitysim/benchmarking/reader.py new file mode 100644 index 0000000000..f1da72d706 --- /dev/null +++ b/activitysim/benchmarking/reader.py @@ -0,0 +1,28 @@ +import pandas as pd +import yaml + + +def read_results(json_file): + """ + Read benchmarking results from a single commit on a single machine. + + Parameters + ---------- + json_file : str + Path to the json file containing the target results. + + Returns + ------- + pandas.DataFrame + """ + out_data = {} + with open(json_file, "rt") as f: + in_data = yaml.safe_load(f) + for k, v in in_data["results"].items(): + if v is None: + continue + m, c, _ = k.split(".") + if m not in out_data: + out_data[m] = {} + out_data[m][c] = v["result"][0] + return pd.DataFrame(out_data) diff --git a/activitysim/benchmarking/workspace.py b/activitysim/benchmarking/workspace.py new file mode 100644 index 0000000000..691c4aa31f --- /dev/null +++ b/activitysim/benchmarking/workspace.py @@ -0,0 +1,20 @@ +import os + +_directory = None + + +def get_dir(): + global _directory + if _directory is None: + _directory = os.environ.get("ASIM_ASV_WORKSPACE", None) + if _directory is None: + _directory = os.environ.get("ASV_CONF_DIR", None) + return _directory + + +def set_dir(directory): + global _directory + if directory: + _directory = directory + else: + _directory = os.environ.get("ASIM_ASV_WORKSPACE", None) diff --git a/activitysim/cli/benchmark.py b/activitysim/cli/benchmark.py new file mode 100644 index 0000000000..69c5c6ce49 --- /dev/null +++ b/activitysim/cli/benchmark.py @@ -0,0 +1,292 @@ +import os +import sys +import json +import shutil +import subprocess + +ASV_CONFIG = { + # The version of the config file format. Do not change, unless + # you know what you are doing. + "version": 1, + # The name of the project being benchmarked + "project": "activitysim", + # The project's homepage + "project_url": "https://activitysim.github.io/", + # The URL or local path of the source code repository for the + # project being benchmarked + "repo": ".", + # The tool to use to create environments. + "environment_type": "conda", + # the base URL to show a commit for the project. + "show_commit_url": "http://github.com/ActivitySim/activitysim/commit/", + # The Pythons you'd like to test against. If not provided, defaults + # to the current version of Python used to run `asv`. + # "pythons": ["2.7", "3.6"], + # The list of conda channel names to be searched for benchmark + # dependency packages in the specified order + "conda_channels": ["conda-forge"], + # The matrix of dependencies to test. Each key is the name of a + # package (in PyPI) and the values are version numbers. An empty + # list or empty string indicates to just test against the default + # (latest) version. null indicates that the package is to not be + # installed. If the package to be tested is only available from + # PyPi, and the 'environment_type' is conda, then you can preface + # the package name by 'pip+', and the package will be installed via + # pip (with all the conda available packages installed first, + # followed by the pip installed packages). + "matrix": { + "pyarrow": [], + "numpy": [], + "openmatrix": [], + "pandas": [], + "pyyaml": [], + "pytables": [], + "toolz": [], + "orca": [], + "psutil": [], + "requests": [], + "numba": [], + "coverage": [], + "pytest": [], + "cytoolz": [], + }, + # The directory (relative to the current directory) to cache the Python + # environments in. If not provided, defaults to "env" + # "env_dir": "../activitysim-asv/env", + # The directory (relative to the current directory) that raw benchmark + # results are stored in. If not provided, defaults to "results". + # "results_dir": "../activitysim-asv/results", + # The directory (relative to the current directory) that the html tree + # should be written to. If not provided, defaults to "html". + # "html_dir": "../activitysim-asv/html", + # List of branches to benchmark. If not provided, defaults to "master" + # (for git) or "default" (for mercurial). + "branches": ["develop"], +} + + +def make_asv_argparser(parser): + """ + The entry point for asv. + + Most of this work is handed off to the airspeed velocity library. + """ + try: + from asv.commands import common_args, Command, util, command_order + except ImportError: + return + + def help(args): + parser.print_help() + sys.exit(0) + + common_args.add_global_arguments(parser, suppress_defaults=False) + + subparsers = parser.add_subparsers( + title="benchmarking with airspeed velocity", description="valid subcommands" + ) + + help_parser = subparsers.add_parser("help", help="Display usage information") + help_parser.set_defaults(afunc=help) + + commands = dict((x.__name__, x) for x in util.iter_subclasses(Command)) + + hide_commands = [ + "quickstart", + ] + + for command in command_order: + if str(command) in hide_commands: + continue + subparser = commands[str(command)].setup_arguments(subparsers) + common_args.add_global_arguments(subparser) + subparser.add_argument( + "--workspace", + "-w", + help="benchmarking workspace directory", + default=".", + ) + subparser.add_argument( + '--branch', + type=str, + action='append', + metavar='NAME', + help='git branch to include in benchmarking' + ) + del commands[command] + + for name, command in sorted(commands.items()): + if str(command) in hide_commands: + continue + subparser = command.setup_arguments(subparsers) + subparser.add_argument( + "--workspace", + "-w", + help="benchmarking workspace directory", + default=".", + ) + subparser.add_argument( + '--branch', + type=str, + action='append', + metavar='NAME', + help='git branch to include in benchmarking' + ) + common_args.add_global_arguments(subparser) + + from ..benchmarking.latest import Latest, Batch + + subparser = Latest.setup_arguments(subparsers) + subparser.add_argument( + "--workspace", + "-w", + help="benchmarking workspace directory", + default=".", + ) + subparser.add_argument( + '--branch', + type=str, + action='append', + metavar='NAME', + help='git branch to include in benchmarking' + ) + common_args.add_global_arguments(subparser) + + subparser = Batch.setup_arguments(subparsers) + subparser.add_argument( + "--workspace", + "-w", + help="benchmarking workspace directory", + default=".", + ) + + from ..benchmarking.profile_inspector import ProfileInspector + + subparser = ProfileInspector.setup_arguments(subparsers) + subparser.add_argument( + "--workspace", + "-w", + help="benchmarking workspace directory", + default=".", + ) + + parser.set_defaults(afunc=benchmark) + return parser, subparsers + + +def benchmark(args): + try: + import asv + except ModuleNotFoundError: + print("airspeed velocity is not installed") + print("try `conda install asv -c conda-forge` if you want to run benchmarks") + sys.exit(1) + from asv.console import log + from asv import util + + log.enable(args.verbose) + + log.info("<== benchmarking activitysim ==>") + + # workspace + args.workspace = os.path.abspath(args.workspace) + + if os.path.abspath(os.path.expanduser("~")) == args.workspace: + log.error( + "don't run benchmarks in the user's home directory \n" + "try changing directories before calling `activitysim benchmark` " + "or use the --workspace option \n" + ) + sys.exit(1) + + if not os.path.isdir(args.workspace): + raise NotADirectoryError(args.workspace) + log.info(f" workspace: {args.workspace}") + os.chdir(args.workspace) + os.environ["ASIM_ASV_WORKSPACE"] = str(args.workspace) + + from ..benchmarking import workspace + + workspace.set_dir(args.workspace) + + from .. import __path__ as pkg_path + + log.info(f" activitysim installation: {pkg_path[0]}") + + repo_dir = os.path.normpath(os.path.join(pkg_path[0], "..")) + git_dir = os.path.normpath(os.path.join(repo_dir, ".git")) + local_git = os.path.exists(git_dir) + log.info(f" local git repo available: {local_git}") + + branches = args.branch + + asv_config = ASV_CONFIG.copy() + if local_git: + repo_dir_rel = os.path.relpath(repo_dir, args.workspace) + log.info(f" local git repo: {repo_dir_rel}") + asv_config["repo"] = repo_dir_rel + if not branches: + # add current branch to the branches to benchmark + current_branch = subprocess.check_output( + ['git', 'branch', '--show-current'], + env={'GIT_DIR': git_dir}, + stdin=None, stderr=None, + shell=False, + universal_newlines=False, + ).decode().strip() + if current_branch: + asv_config["branches"].append(current_branch) + else: + log.info(f" local git repo available: {local_git}") + asv_config["repo"] = "https://github.com/ActivitySim/activitysim.git" + + asv_config["branches"].extend(branches) + + # copy the benchmarks to the workspace, deleting previous files in workspace + import activitysim.benchmarking.benchmarks + + benchmarks_dir = os.path.dirname(activitysim.benchmarking.benchmarks.__file__) + shutil.rmtree( + os.path.join(args.workspace, "benchmarks"), + ignore_errors=True, + ) + shutil.copytree( + benchmarks_dir, + os.path.join(args.workspace, "benchmarks"), + dirs_exist_ok=True, + ) + + # write the asv config to the workspace + conf_file = os.path.normpath(os.path.join(args.workspace, "asv.conf.json")) + with open(conf_file, "wt") as jf: + json.dump(asv_config, jf) + + if args.config and args.config != "asv.conf.json": + raise ValueError( + "activitysim manages the asv config json file itself, do not use --config" + ) + args.config = os.path.abspath(conf_file) + + # write the pre-commit search and replace hook to the workspace + search_replace_file = os.path.normpath( + os.path.join(args.workspace, ".pre-commit-search-and-replace.yaml") + ) + with open(search_replace_file, "wt") as sf: + benchpath = os.path.join(args.workspace, "benchmarks") + if not benchpath.endswith(os.path.sep): + benchpath += os.path.sep + benchpath = benchpath.replace(os.path.sep, r"[/\\]") + sf.write(f"""- search: /{benchpath}/\n replacement: ./\n""") + + try: + result = args.func(args) + except util.UserError as e: + log.error(str(e)) + sys.exit(1) + finally: + log.flush() + + if result is None: + result = 0 + + sys.exit(result) diff --git a/activitysim/cli/cli.py b/activitysim/cli/cli.py index 1eff88406f..5756490db7 100644 --- a/activitysim/cli/cli.py +++ b/activitysim/cli/cli.py @@ -12,7 +12,7 @@ def __init__(self, version, description): version=self.version) # print help if no subcommand is provided - self.parser.set_defaults(func=lambda x: self.parser.print_help()) + self.parser.set_defaults(afunc=lambda x: self.parser.print_help()) self.subparsers = self.parser.add_subparsers(title='subcommands', help='available subcommand options') @@ -20,8 +20,8 @@ def __init__(self, version, description): def add_subcommand(self, name, args_func, exec_func, description): subparser = self.subparsers.add_parser(name, description=description) args_func(subparser) - subparser.set_defaults(func=exec_func) + subparser.set_defaults(afunc=exec_func) def execute(self): args = self.parser.parse_args() - args.func(args) + args.afunc(args) diff --git a/activitysim/cli/create.py b/activitysim/cli/create.py index b9e0fe33d1..b9c64b1719 100644 --- a/activitysim/cli/create.py +++ b/activitysim/cli/create.py @@ -81,7 +81,7 @@ def list_examples(): return ret -def get_example(example_name, destination): +def get_example(example_name, destination, benchmarking=False): """ Copy project data to user-specified directory. @@ -101,6 +101,7 @@ def get_example(example_name, destination): If the target directory already exists, project files will be copied into a subdirectory with the same name as the example + benchmarking: bool """ if example_name not in EXAMPLES: sys.exit(f"error: could not find example '{example_name}'") @@ -111,8 +112,11 @@ def get_example(example_name, destination): dest_path = destination example = EXAMPLES[example_name] + itemlist = example.get('include', []) + if benchmarking: + itemlist.extend(example.get('benchmarking', [])) - for item in example.get('include', []): + for item in itemlist: # split include string into source/destination paths items = item.split() @@ -149,11 +153,18 @@ def copy_asset(asset_path, target_path, dirs_exist_ok=False): shutil.copytree(asset_path, target_path, dirs_exist_ok=dirs_exist_ok) else: + target_dir = os.path.dirname(target_path) + if target_dir: + os.makedirs(target_dir, exist_ok=True) shutil.copy(asset_path, target_path) def download_asset(url, target_path, sha256=None): os.makedirs(os.path.dirname(target_path), exist_ok=True) + if url.endswith(".gz") and not target_path.endswith(".gz"): + target_path_dl = target_path + ".gz" + else: + target_path_dl = target_path if sha256 and os.path.isfile(target_path): computed_sha256 = sha256_checksum(target_path) if sha256 == computed_sha256: @@ -167,9 +178,15 @@ def download_asset(url, target_path, sha256=None): print(f'downloading {os.path.basename(target_path)} ...') with requests.get(url, stream=True) as r: r.raise_for_status() - with open(target_path, 'wb') as f: + with open(target_path_dl, 'wb') as f: for chunk in r.iter_content(chunk_size=None): f.write(chunk) + if target_path_dl != target_path: + import gzip + with gzip.open(target_path_dl, 'rb') as f_in: + with open(target_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(target_path_dl) computed_sha256 = sha256_checksum(target_path) if sha256 and sha256 != computed_sha256: raise ValueError( diff --git a/activitysim/cli/main.py b/activitysim/cli/main.py index 113e8b9d68..323122c003 100644 --- a/activitysim/cli/main.py +++ b/activitysim/cli/main.py @@ -1,13 +1,25 @@ import sys +import os -from activitysim.cli import CLI -from activitysim.cli import run -from activitysim.cli import create -from activitysim import __version__, __doc__ +def main(): + # set all these before we import numpy or any other math library + if len(sys.argv) > 1 and sys.argv[1] == "benchmark": + os.environ['MKL_NUM_THREADS'] = '1' + os.environ['OMP_NUM_THREADS'] = '1' + os.environ['OPENBLAS_NUM_THREADS'] = '1' + os.environ['NUMBA_NUM_THREADS'] = '1' + os.environ['VECLIB_MAXIMUM_THREADS'] = '1' + os.environ['NUMEXPR_NUM_THREADS'] = '1' + + from activitysim.cli import CLI + from activitysim.cli import run + from activitysim.cli import create + from activitysim.cli import benchmark + + from activitysim import __version__, __doc__ -def main(): asim = CLI(version=__version__, description=__doc__) asim.add_subcommand(name='run', @@ -18,4 +30,8 @@ def main(): args_func=create.add_create_args, exec_func=create.create, description=create.create.__doc__) + asim.add_subcommand(name='benchmark', + args_func=benchmark.make_asv_argparser, + exec_func=benchmark.benchmark, + description=benchmark.benchmark.__doc__) sys.exit(asim.execute()) diff --git a/activitysim/cli/run.py b/activitysim/cli/run.py index c4a2ffb14e..87a7d3d929 100644 --- a/activitysim/cli/run.py +++ b/activitysim/cli/run.py @@ -58,6 +58,13 @@ def add_run_args(parser, multiprocess=True): type=int, metavar='BYTES', help='chunk size') + parser.add_argument('--chunk_training_mode', + type=str, + help='chunk training mode, one of [training, adaptive, production, disabled]') + parser.add_argument('--households_sample_size', + type=int, + metavar='N', + help='households sample size') if multiprocess: parser.add_argument('-m', '--multiprocess', @@ -130,6 +137,10 @@ def inject_arg(name, value, cache=False): if args.chunk_size: config.override_setting('chunk_size', int(args.chunk_size)) + if args.chunk_training_mode is not None: + config.override_setting('chunk_training_mode', args.chunk_training_mode) + if args.households_sample_size is not None: + config.override_setting('households_sample_size', args.households_sample_size) for injectable in ['configs_dir', 'data_dir', 'output_dir']: validate_injectable(injectable) diff --git a/activitysim/core/assign.py b/activitysim/core/assign.py index b764d37cce..37236327df 100644 --- a/activitysim/core/assign.py +++ b/activitysim/core/assign.py @@ -269,7 +269,7 @@ def to_series(x): logger.warning("assign_variables target obscures local_d name '%s'", str(target)) if trace_label: - logger.debug(f"{trace_label}.assign_variables {target} = {expression}") + logger.info(f"{trace_label}.assign_variables {target} = {expression}") if is_temp_singular(target) or is_throwaway(target): try: diff --git a/activitysim/core/config.py b/activitysim/core/config.py index b85a02e81a..5d7960873c 100644 --- a/activitysim/core/config.py +++ b/activitysim/core/config.py @@ -333,6 +333,12 @@ def log_file_path(file_name, prefix=True): output_dir = inject.get_injectable('output_dir') + # - check if running asv and if so, log to commit-specific subfolder + asv_commit = os.environ.get('ASV_COMMIT', None) + if asv_commit: + output_dir = os.path.join(output_dir, f'log-{asv_commit}') + os.makedirs(output_dir, exist_ok=True) + # - check for optional log subfolder if os.path.exists(os.path.join(output_dir, 'log')): output_dir = os.path.join(output_dir, 'log') @@ -377,7 +383,7 @@ def __str__(self): return repr(f"Settings file '{self.file_name}' not found in {self.configs_dir}") -def read_settings_file(file_name, mandatory=True, include_stack=[], configs_dir_list=None): +def read_settings_file(file_name, mandatory=True, include_stack=False, configs_dir_list=None): """ look for first occurence of yaml file named in directories in configs_dir list, @@ -396,7 +402,7 @@ def read_settings_file(file_name, mandatory=True, include_stack=[], configs_dir_ file_name mandatory: booelan if true, raise SettingsFileNotFound exception if no settings file, otherwise return empty dict - include_stack: boolean + include_stack: boolean or list only used for recursive calls to provide list of files included so far to detect cycles Returns: dict @@ -422,7 +428,10 @@ def backfill_settings(settings, backfill): inheriting = False settings = {} - source_file_paths = include_stack.copy() + if isinstance(include_stack, list): + source_file_paths = include_stack.copy() + else: + source_file_paths = [] for dir in configs_dir_list: file_path = os.path.join(dir, file_name) if os.path.exists(file_path): @@ -480,7 +489,6 @@ def backfill_settings(settings, backfill): assert os.path.join(dir, inherit_file_name) not in source_file_paths, \ f"circular inheritance of {inherit_file_name}: {source_file_paths}: " # make a recursive call to switch inheritance chain to specified file - configs_dir_list = None logger.debug("inheriting additional settings for %s from %s" % (file_name, inherit_file_name)) s, source_file_paths = \ diff --git a/activitysim/core/input.py b/activitysim/core/input.py index 3ae3fd806a..6a864eea8b 100644 --- a/activitysim/core/input.py +++ b/activitysim/core/input.py @@ -193,7 +193,7 @@ def read_from_table_info(table_info): def _read_input_file(filepath, h5_tablename=None, csv_dtypes=None): assert os.path.exists(filepath), 'input file not found: %s' % filepath - if filepath.endswith('.csv'): + if filepath.endswith('.csv') or filepath.endswith('.csv.gz'): return _read_csv_with_fallback_encoding(filepath, csv_dtypes) if filepath.endswith('.h5'): diff --git a/activitysim/core/pipeline.py b/activitysim/core/pipeline.py index 1b2240e6d7..89c5c161cc 100644 --- a/activitysim/core/pipeline.py +++ b/activitysim/core/pipeline.py @@ -81,6 +81,14 @@ def is_open(): return _PIPELINE.is_open +def is_readonly(): + if is_open(): + store = get_pipeline_store() + if store and store._mode == 'r': + return True + return False + + def pipeline_table_key(table_name, checkpoint_name): if checkpoint_name: key = f"{table_name}/{checkpoint_name}" @@ -101,7 +109,7 @@ def close_open_files(): _PIPELINE.open_files.clear() -def open_pipeline_store(overwrite=False): +def open_pipeline_store(overwrite=False, mode='a'): """ Open the pipeline checkpoint store @@ -109,6 +117,17 @@ def open_pipeline_store(overwrite=False): ---------- overwrite : bool delete file before opening (unless resuming) + mode : {'a', 'w', 'r', 'r+'}, default 'a' + ``'r'`` + Read-only; no data can be modified. + ``'w'`` + Write; a new file is created (an existing file with the same + name would be deleted). + ``'a'`` + Append; an existing file is opened for reading and writing, + and if the file does not exist it is created. + ``'r+'`` + It is similar to ``'a'``, but the file must already exist. """ if _PIPELINE.pipeline_store is not None: @@ -125,7 +144,7 @@ def open_pipeline_store(overwrite=False): print(e) logger.warning("Error removing %s: %s" % (pipeline_file_path, e)) - _PIPELINE.pipeline_store = pd.HDFStore(pipeline_file_path, mode='a') + _PIPELINE.pipeline_store = pd.HDFStore(pipeline_file_path, mode=mode) logger.debug(f"opened pipeline_store {pipeline_file_path}") @@ -354,8 +373,10 @@ def load_checkpoint(checkpoint_name): i = checkpoints[checkpoints[CHECKPOINT_NAME] == checkpoint_name].index[0] checkpoints = checkpoints.loc[:i] + # if the store is not open in read-only mode, # write it to the store to ensure so any subsequent checkpoints are forgotten - write_df(checkpoints, CHECKPOINT_TABLE_NAME) + if not is_readonly(): + write_df(checkpoints, CHECKPOINT_TABLE_NAME) except IndexError: msg = "Couldn't find checkpoint '%s' in checkpoints" % (checkpoint_name,) @@ -487,7 +508,7 @@ def run_model(model_name): logger.info("##### skipping %s checkpoint for %s" % (step_name, model_name)) -def open_pipeline(resume_after=None): +def open_pipeline(resume_after=None, mode='a'): """ Start pipeline, either for a new run or, if resume_after, loading checkpoint from pipeline. @@ -498,6 +519,9 @@ def open_pipeline(resume_after=None): ---------- resume_after : str or None name of checkpoint to load from pipeline store + mode : {'a', 'w', 'r', 'r+'}, default 'a' + same as for typical opening of H5Store. Ignored unless resume_after + is not None. This is here to allow read-only pipeline for benchmarking. """ if is_open(): @@ -511,7 +535,7 @@ def open_pipeline(resume_after=None): if resume_after: # open existing pipeline logger.debug("open_pipeline - open existing pipeline") - open_pipeline_store(overwrite=False) + open_pipeline_store(overwrite=False, mode=mode) load_checkpoint(resume_after) else: # open new, empty pipeline diff --git a/activitysim/core/skim_dict_factory.py b/activitysim/core/skim_dict_factory.py index 51b1bc59ab..acba205c4c 100644 --- a/activitysim/core/skim_dict_factory.py +++ b/activitysim/core/skim_dict_factory.py @@ -122,7 +122,8 @@ def load_skim_info(self, skim_tag): if self.omx_shape is None: self.omx_shape = tuple(int(i) for i in omx_file.shape()) # sometimes omx shape are floats! else: - assert (self.omx_shape == tuple(int(i) for i in omx_file.shape())) + assert (self.omx_shape == tuple(int(i) for i in omx_file.shape())), \ + f"Mismatch shape {self.omx_shape} != {omx_file.shape()}" for skim_name in omx_file.listMatrices(): assert skim_name not in self.omx_manifest, \ diff --git a/activitysim/core/tracing.py b/activitysim/core/tracing.py index 271fc5728d..702cdd8522 100644 --- a/activitysim/core/tracing.py +++ b/activitysim/core/tracing.py @@ -30,6 +30,18 @@ logger = logging.getLogger(__name__) +class ElapsedTimeFormatter(logging.Formatter): + def format(self, record): + duration_milliseconds = record.relativeCreated + hours, rem = divmod(duration_milliseconds / 1000, 3600) + minutes, seconds = divmod(rem, 60) + if hours: + record.elapsedTime = ("{:0>2}:{:0>2}:{:05.2f}".format(int(hours), int(minutes), seconds)) + else: + record.elapsedTime = ("{:0>2}:{:05.2f}".format(int(minutes), seconds)) + return super(ElapsedTimeFormatter, self).format(record) + + def extend_trace_label(trace_label, extension): if trace_label: trace_label = "%s.%s" % (trace_label, extension) @@ -63,9 +75,15 @@ def log_runtime(model_name, start_time=None, timing=None): process_name = multiprocessing.current_process().name - # only log runtime for locutor - if config.setting('multiprocess', False) and not inject.get_injectable('locutor', False): - return + if config.setting('multiprocess', False): + # when benchmarking, log timing for each processes in its own log + if config.setting('benchmarking', False): + header = "component_name,duration" + with config.open_log_file(f'timing_log.{process_name}.csv', 'a', header) as log_file: + print(f"{model_name},{timing}", file=log_file) + # only continue to log runtime in global timing log for locutor + if not inject.get_injectable('locutor', False): + return header = "process_name,model_name,seconds,minutes" with config.open_log_file('timing_log.csv', 'a', header) as log_file: diff --git a/activitysim/estimation/test/test_larch_estimation.py b/activitysim/estimation/test/test_larch_estimation.py index d90c51551c..2cdc0a1a2f 100644 --- a/activitysim/estimation/test/test_larch_estimation.py +++ b/activitysim/estimation/test/test_larch_estimation.py @@ -47,9 +47,11 @@ def _regression_check(dataframe_regression, df, basename=None): # pandas 1.3 handles int8 dtypes as actual numbers, so holdfast needs to be dropped manually # we're dropping it not adding to the regression check so older pandas will also work. basename=basename, - default_tolerance=dict(atol=1e-6, rtol=5e-2) + default_tolerance=dict(atol=1e-6, rtol=0.1) # set a little loose, as there is sometimes a little variance in these - # results when switching backend implementations. + # results when switching backend implementations. We're checking all + # the parameters and the log likelihood, so modest variance in individual + # parameters, especially those with high std errors, is not problematic. ) diff --git a/activitysim/examples/example_mtc/configs/logging.yaml b/activitysim/examples/example_mtc/configs/logging.yaml index 71ac15cc1f..43a70cb58e 100644 --- a/activitysim/examples/example_mtc/configs/logging.yaml +++ b/activitysim/examples/example_mtc/configs/logging.yaml @@ -36,7 +36,7 @@ logging: console: class: logging.StreamHandler stream: ext://sys.stdout - formatter: simpleFormatter + formatter: elapsedFormatter level: NOTSET formatters: @@ -52,3 +52,7 @@ logging: format: '%(asctime)s - %(levelname)s - %(name)s - %(message)s' datefmt: '%d/%m/%Y %H:%M:%S' + elapsedFormatter: + (): activitysim.core.tracing.ElapsedTimeFormatter + format: '[{elapsedTime}] {levelname:s}: {message:s}' + style: '{' diff --git a/activitysim/examples/example_sandag/configs_3_zone/logging.yaml b/activitysim/examples/example_sandag/configs_3_zone/logging.yaml index 7742c3ece3..93cf6cea93 100644 --- a/activitysim/examples/example_sandag/configs_3_zone/logging.yaml +++ b/activitysim/examples/example_sandag/configs_3_zone/logging.yaml @@ -36,7 +36,7 @@ logging: console: class: logging.StreamHandler stream: ext://sys.stdout - formatter: simpleFormatter + formatter: elapsedFormatter level: INFO formatters: @@ -52,3 +52,7 @@ logging: format: '%(asctime)s - %(levelname)s - %(name)s - %(message)s' datefmt: '%d/%m/%Y %H:%M:%S' + elapsedFormatter: + (): activitysim.core.tracing.ElapsedTimeFormatter + format: '[{elapsedTime}] {levelname:s}: {message:s}' + style: '{' diff --git a/activitysim/examples/example_sandag/configs_benchmarking/settings.yaml b/activitysim/examples/example_sandag/configs_benchmarking/settings.yaml new file mode 100644 index 0000000000..c432ba369f --- /dev/null +++ b/activitysim/examples/example_sandag/configs_benchmarking/settings.yaml @@ -0,0 +1,146 @@ +inherit_settings: True + +# - tracing + +# trace household id; comment out or leave empty for no trace +# households with all tour types +trace_hh_id: + +# trace origin, destination in accessibility calculation; comment out or leave empty for no trace +trace_od: + +# input tables +input_table_list: + - tablename: households + filename: households.csv + index_col: household_id + rename_columns: + HHID: household_id + PERSONS: hhsize + workers: num_workers + VEHICL: auto_ownership + MAZ: home_zone_id + keep_columns: + - home_zone_id + - income + - hhsize + - HHT + - auto_ownership + - num_workers + - tablename: persons + filename: persons.csv + index_col: person_id + rename_columns: + PERID: person_id + keep_columns: + - household_id + - age + - PNUM + - sex + - pemploy + - pstudent + - ptype + - tablename: land_use + filename: land_use.csv + index_col: zone_id + rename_columns: + MAZ: zone_id + COUNTY: county_id + keep_columns: + - TAZ + - DISTRICT + - SD + - county_id + - TOTHH + - TOTPOP + - TOTACRE + - RESACRE + - CIACRE + - TOTEMP + - AGE0519 + - RETEMPN + - FPSEMPN + - HEREMPN + - OTHEMPN + - AGREMPN + - MWTEMPN + - PRKCST + - OPRKCST + - area_type + - HSENROLL + - COLLFTE + - COLLPTE + - TOPOLOGY + - TERMINAL + - access_dist_transit + - tablename: accessibility + filename: cached_accessibility.csv.gz + index_col: zone_id + keep_columns: + - auPkRetail + - auPkTotal + - auOpRetail + - auOpTotal + - trPkRetail + - trPkTotal + - trOpRetail + - trOpTotal + - nmRetail + - nmTotal + +output_tables: + h5_store: False + action: include + prefix: final_ + sort: True + tables: + - checkpoints + - accessibility + - land_use + - households + - persons + - tours + - trips + +models: + - initialize_landuse + - initialize_households + # compute_accessibility # use cached table, otherwise overwhelms benchmark runtime + # --- STATIC cache prebuild steps + # single-process step to create attribute_combination list + - initialize_los + # multi-processable step to build STATIC cache + # (this step is a NOP if cache already exists and network_los.rebuild_tvpb_cache setting is False) + - initialize_tvpb + # --- + - school_location + - workplace_location + - auto_ownership_simulate + - free_parking + - cdap_simulate + - mandatory_tour_frequency + - mandatory_tour_scheduling + - joint_tour_frequency + - joint_tour_composition + - joint_tour_participation + - joint_tour_destination + - joint_tour_scheduling + - non_mandatory_tour_frequency + - non_mandatory_tour_destination + - non_mandatory_tour_scheduling + - tour_mode_choice_simulate + - atwork_subtour_frequency + - atwork_subtour_destination + - atwork_subtour_scheduling + - atwork_subtour_mode_choice + - stop_frequency + - trip_purpose + - trip_destination + - trip_purpose_and_destination + - trip_scheduling + - trip_mode_choice + - write_data_dictionary + - track_skim_usage + - write_trip_matrices + - write_tables + diff --git a/activitysim/examples/example_sandag/data_3/cached_accessibility.csv.gz b/activitysim/examples/example_sandag/data_3/cached_accessibility.csv.gz new file mode 100644 index 0000000000..f5bafec20f Binary files /dev/null and b/activitysim/examples/example_sandag/data_3/cached_accessibility.csv.gz differ diff --git a/conda-environments/activitysim-dev.yml b/conda-environments/activitysim-dev.yml index 840aa1b054..373e03c555 100644 --- a/conda-environments/activitysim-dev.yml +++ b/conda-environments/activitysim-dev.yml @@ -5,31 +5,54 @@ channels: dependencies: - python=3.9 - pip -- numpy >= 1.16.1,<=1.21 -- pandas >= 1.1.0 -- pyarrow >= 2.0 -- openmatrix >= 0.3.4.1 -- pyyaml >= 5.1 -- pytables >= 3.5.1 +- asv # for benchmarking +- bump2version # for making a release +- coveralls - cytoolz >= 0.8.1 -- psutil >= 4.1 -- requests >= 2.7 -- numba >= 0.51.2 -- orca >= 1.6 -- larch >= 5.5.8 +- dask +- descartes +- filelock +- fsspec - geopandas +- gh +- git +- ipykernel # so this env will appear in jupyter as a selection - jupyterlab +- larch >= 5.5.8 - matplotlib -- descartes +- myst-parser # allows markdown in sphinx +- nbconvert +- nbformat +- numba >= 0.51.2 +- numexpr +- numpy >= 1.16.1,<=1.21 +- numpydoc +- openmatrix >= 0.3.4.1 +- orca >= 1.6 +- pandas >= 1.1.0 +- pre-commit +- psutil >= 4.1 +- pyarrow >= 2.0 +- pycodestyle +- pydata-sphinx-theme +- pyinstrument +- pypyr +- pytables >=3.5.1,<3.7 - pytest - pytest-cov -- coveralls -- pycodestyle - pytest-regressions -- git -- gh -- bump2version +- pyyaml >= 5.1 +- requests >= 2.7 +- rich +- ruby # required for benchmarking pre-commit hooks - simwrapper > 1.7 +- snakeviz # for profiling +- sphinx +- sphinx_rtd_theme +- sphinx-argparse +- xarray >= 0.21 +- xmle +- zarr - pip: - -e .. diff --git a/conda-environments/activitysim-test-larch.yml b/conda-environments/activitysim-test-larch.yml index 0e3ed4a1b6..529f9ffc2a 100644 --- a/conda-environments/activitysim-test-larch.yml +++ b/conda-environments/activitysim-test-larch.yml @@ -18,6 +18,8 @@ dependencies: - numba >= 0.51.2 - orca >= 1.6 - larch >=5.5.3 +- xarray +- sharrow - pytest - pytest-cov - coveralls diff --git a/docs/benchmarking.rst b/docs/benchmarking.rst new file mode 100644 index 0000000000..bf3b85ddf2 --- /dev/null +++ b/docs/benchmarking.rst @@ -0,0 +1,283 @@ + +.. _benchmarking : + +Benchmarking +------------ + +A key focus of the ActivitySim project is *performance*. It's not enough +to build a new modeling platform that's mathematically sound and simulates +travel behavior as expected. It's also required that it do so quickly. +It's not too hard to run performance tests manually on individual models, and +doing so after making changes that are expected to *improve* performance is +typical. But monitoring performance regularly and automatically can help ensure +that new features do not introduce unexpected performance regressions (i.e. +models run slower than before). Developing an extensive set of automatic +performance benchmarks can streamline the former problem and solve the latter. + +ActivitySim includes the ability to run performance benchmarks using a tool +called `airspeed velocity `__. + +The benchmarking process is closely tied to ActivitySim's *git* repository, +so it is recommended that you use Git to clone the repository from GitHub. + + +Benchmarking Setup +~~~~~~~~~~~~~~~~~~ + +The first step in running benchmarks is to have a conda environment for +benchmarking, as well as a local clone of the main ActivitySim repository, +plus one of the ``asim-benchmarks`` repository. If you plan to submit your +benchmarking results to the common repository of results, you'll want to +also make sure that your ``asim-benchmarks`` repository is using a fork of the +common repository to which you have write-access. + +If this isn't already set up on your performance benchmarking machine, you can +do all of this setup by following these steps:: + + conda create -n ASIM-BENCH mamba git gh -c conda-forge --override-channels + conda activate ASIM-BENCH + gh auth login # <--- (only needed if gh is not logged in) + gh repo clone ActivitySim/activitysim # TEMPORARY: use jpn--/activitysim + cd activitysim + git switch develop # TEMPORARY: use performance1 branch + mamba env update --file=conda-environments/activitysim-dev.yml + cd .. + gh repo fork ActivitySim/asim-benchmarks --remote + cd asim-benchmarks + python initialize-hooks.py + +For non-Windows users, you can then actually activate the pre-commit hooks like +this:: + + pre-commit install # macOS/Linux only, do not run this line on Windows + +Windows users should not attempt to use installed pre-commit hooks with conda +(see note below). Instead, you must manually ``pre-commit run`` inside the correct +conda environment before committing. + +If this environment is set up but it's been a while since you last used it, +consider updating the environment like this:: + + conda activate ASIM-BENCH + cd activitysim + git switch develop # TEMPORARY: use performance1 branch + mamba env update --file=conda-environments/activitysim-dev.yml + cd .. + cd asim-benchmarks + git pull + +Next, we'll want to declare the specs of our benchmarking machine. Some of +these can be determined quasi-automatically, but we want to confirm the specs +we'll use as they are written with our benchmark results into the database. +Define machine specs by running this command:: + + activitysim benchmark machine + +This will start an interactive questions and answer session to describe your +computer. Don't be afraid, just answer the questions. The tool may make +suggestions, but they are not always correct, so check them first and don't just +accept all. For example, under "arch" it may suggest "AMD64", but for consistency +you can change that to "x86_64", which is the same thing by a different name. + +Running Benchmarks +~~~~~~~~~~~~~~~~~~ + +ActivitySim automates the process of running many benchmarks. It can also easily +accumulate and analyze benchmark results across many different machines, as long as the +benchmarks are all run in the same (relative) place. So before running benchmarks, +change your working directory (at the command prompt) into the top directory of +the `asim-benchmarks` repository, if you're not already there. + +To run all of the benchmarks on the most recent commit in the main ActivitySim repo:: + + activitysim benchmark latest + +.. important:: + + The benchmarks do not currently use ActivitySim's dynamic chunking features, + as these require manual configuration and training on a per-machine basis + to ensure good performance. + + Running the complete suite of benchmarks currently includes downloading and + running full-region model data for several different SANDAG zone systems. + Ideally you should have at least 50 GB of free disk space and 120 GB of RAM + to attempt this process on any given machine. For a smaller machine, consider + benchmarking only the "test" sized examples, by adding `--bench sandag.example` + to this command, as discussed below. + +This will run the benchmarks only on the "HEAD" commit of the main activitysim git +repository. To run on some other historical commit[s] from the git history, you can +specify an individual commit or a range, in the same way you would do so for the +`git log` command. For example, to run benchmarks on the commits to develop since +it was branched off master, run:: + + activitysim benchmark run master..develop + +or to run only on the latest commit in develop, run:: + + activitysim benchmark run "develop^!" + +Note that the literal quotation marks are necessary on Windows, as the carat character +preceding the exclamation mark is otherwise interpreted as an escape character. +In most other shells (e.g. on Linux or macOS) the literal quotation marks are unnecessary. + +To run only benchmarks from a certain example, we can +use the `--bench` argument, which allows us to write a "regular expression" that +filters the benchmarks actually executed. This is handy if you are interested in +benchmarking a particular model or component, as running *all* the benchmarks can +take a very long time, and the larger benchmarks (e.g. on the full SANDAG model) +will need a lot of disk space and RAM. For example, to run only the mandatory +tour frequency benchmark for the SANDAG 1-Zone example-sized system, run:: + + activitysim benchmark latest --bench sandag1example.time_mandatory_tour_frequency + +The "." character here means a literal dot, but since this is a regex expression, +it is also a single-character wildcard. Thus, you can run all the example-sized +SANDAG benchmarks with:: + + activitysim benchmark latest --bench sandag.example + +You can also repeat the `--bench` argument to give multiple different expressions. +So, you can run just the 1- and 2-zone examples, without the 3-zone example:: + + activitysim benchmark latest --bench sandag1example --bench sandag2example + +If you want to run several different benchmarking commmands together, for example +to run a custom curated subset of interesting benchmarks, the benchmark tool also +includes a `batch` mode. You can assemble the various commands you would run +(i.e. everything you would type on the command line after "activitysim benchmark") +into a text file, and then point to that file using the `batch` command:: + + activitysim benchmark batch my_interesting_benchmarks.txt + + +Threading Limits +~~~~~~~~~~~~~~~~ + +When you run benchmarking using the `activitysim benchmark` command, the +following environment variable are set automatically before benchmarking begins:: + + MKL_NUM_THREADS = 1 + OMP_NUM_THREADS = 1 + OPENBLAS_NUM_THREADS = 1 + NUMBA_NUM_THREADS = 1 + VECLIB_MAXIMUM_THREADS = 1 + NUMEXPR_NUM_THREADS = 1 + +This ensures that all benchmarking operations run processes in single-threaded +mode. This still allows ActivitySim itself to spin up multiple processes if the +item being timed is a multiprocess benchmark. + +Submitting Benchmarks +~~~~~~~~~~~~~~~~~~~~~ + +One of the useful features of the airspeed velocity benchmarking engine is the +opportunity to compare performance benchmarks across different machines. The +ActivitySim community is interested in aggregating such results from a number +of participants, so once you have successfully run a set of benchmarks, you +should submit those results to our repository. + +To do so, assuming you have run the benchmark tool inside the ``asim-benchmarks`` +repository as noted above, you simply need to commit any new or changed files +in the ``asim-benchmarks/results`` directory. You can then open a pull request +against the community ``asim-benchmarks`` to submit those results. + +Assuming you are in (or first ``cd`` into) the ``asim-benchmarks`` directory, You can +do this from the command line using the following steps:: + + git add results + pre-commit run # required on Windows only, see note + git commit -m "adding benchmark results" + git push + gh pr create + +.. note:: + + On Windows, the process for automatically running pre-commit hooks when + making a Git a commit is not compatible with conda, see + `here `. This will + probably never be fixed, as the developers of pre-commit and conda each + feel that the "bug" is in the other library. So, manually running the + pre-commit step is required. + +Users may find it simpler to skip the last step on the command line, and simply +visit their fork on GitHub.com to use the web interface to open a pull request. + +Publishing to Github Pages +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Publishing the standard airspeed velocity content to GitHub pages is a built-in +feature of the command line tool, available to users who have write-access to the +asim-benchmarks GitHub repository. Be sure you have all the relevant branches +tracked locally (especially master and develop) and then run:: + + activitysim benchmark gh-pages + + +Profiling +~~~~~~~~~ + +The benchmarking tool can also be used for profiling, which allows a developer to +inspect the timings for various commands *inside* a particular benchmark. This is +most conveniently accomplished using the ``snakeviz`` tool, which should be installed +in the developer tools environment (``conda install snakeviz -c conda-forge``). +Then, the developer needs to run two commands to compute and view the component +profile. + +To create a profile record when benchmarking, add the ``--profile`` option when +running the benchmarks. For example, to create profile records for the SANDAG +example-sized model's non-mandatory tour scheduling component across all three +zone systems, run:: + + activitysim benchmark latest --bench sandag.example.non_mandatory_tour_scheduling --profile + +This command will save the profiling data directly into the json file that stores +the benchmark timings. This is a lot of extra data, so it's not advised to +save profiling data for every benchmark, but only for benchmarks of particular +interest. + +Once this data has been saved, you can access it using the ``snakeviz`` tool. This +visualization requires pointing to a specific profiled benchmark in a specific +json result file. For example:: + + activitysim benchmark snakeviz results/LUMBERJACK/241ddb64-env-c87ac846ee78e51351a06682de5adcb5.json sandag3example.non_mandatory_tour_scheduling.time_component + +On running this command, a web browser should pop open to display the snakeviz +interface. + +Writing New Benchmarks +~~~~~~~~~~~~~~~~~~~~~~ + +New benchmarks for other model examples can be added to +``activitysim/benchmarking/benchmarks``. A basic template structure has been used, +so that it should be relatively straight-forward to implement component-level +single thread benchmarks for any model that is available using the +``activitysim create`` tool. + +A basic framework for multi-processing benchmarks has been implemented and is +demonstrated in the ``mtc1mp4`` benchmark file. However, work remains to write +a stable process to execute chunking training for each machine prior to running +the production-version benchmarks that will be meaningful for users. + +Running Benchmarks for Pull Requests +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The complete set of performance benchmarks is too large to include in ActivitySim's +automatic continuous integration (CI) testing, both by compute time and by memory usage. +However, it is valuable to run these tests once against the final version of +each PR before merging into the ``develop`` branch, to ensure there are no +unexpected performance regressions. The airspeed velocity tools include a special +CI mode, which runs the same benchmarks on the same machine with the same settings, +giving developers a fair shot at a strict apples-to-apples comparison of performance. + +This mode can be activated to check the performance of code on a git branch called +``my-new-feature-branch``, and compare against the ``develop`` branch like this:: + + activitysim benchmark continuous develop my-new-feature-branch + +Unlike other tests for mathematical correctness, it is not always necessary that +new PR's must "pass" this testing, as new features or capabilities may justify a +performance degradation. But developers should always run these tests on new PR's +so that the community is aware of the trade offs (if any) and can take steps to +mitigate problems promptly if desired. + diff --git a/docs/index.rst b/docs/index.rst index ff5fdb6eac..1a3be50352 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -32,6 +32,7 @@ Contents estimation howitworks development + benchmarking Indices and tables