From def96d6a31557abe8551f0c4eb19c6e3f7f95395 Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Fri, 2 Jul 2021 09:23:40 -0500 Subject: [PATCH 1/6] feat: chunkless operation It appears that I can't currently run a model with chunking turned off unless it's in training mode, but training mode carries a lot of overhead monitoring memory usage. --- activitysim/core/chunk.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/activitysim/core/chunk.py b/activitysim/core/chunk.py index 252b780e19..628e1526ec 100644 --- a/activitysim/core/chunk.py +++ b/activitysim/core/chunk.py @@ -74,7 +74,8 @@ MODE_RETRAIN = 'training' MODE_ADAPTIVE = 'adaptive' MODE_PRODUCTION = 'production' -TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION] +MODE_CHUNKLESS = 'disabled' +TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION, MODE_CHUNKLESS] # # low level @@ -135,7 +136,9 @@ def chunk_metric(): def chunk_training_mode(): training_mode = \ SETTINGS.setdefault('chunk_training_mode', config.setting('chunk_training_mode', MODE_ADAPTIVE)) - assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode} not one of: {TRAINING_MODES}" + if not training_mode: + training_mode = MODE_CHUNKLESS + assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode}' not one of: {TRAINING_MODES}" return training_mode @@ -223,8 +226,9 @@ def consolidate_logs(): if not glob_files: return - assert chunk_training_mode() != MODE_PRODUCTION, \ - f"shouldn't be any chunk log files when chunk_training_mode is {MODE_PRODUCTION}" + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS), \ + f"shouldn't be any chunk log files when chunk_training_mode" \ + f" is {MODE_PRODUCTION} or {MODE_CHUNKLESS}" # # OMNIBUS_LOG_FILE @@ -331,6 +335,9 @@ def load_cached_history(self): else: self.have_cached_history = False + if chunk_training_mode() == MODE_CHUNKLESS: + return + if chunk_training_mode() == MODE_PRODUCTION: # raise RuntimeError(f"chunk_training_mode is {MODE_PRODUCTION} but no chunk_cache: {chunk_cache_path}") @@ -379,7 +386,7 @@ def cached_row_size(self, chunk_tag): def write_history(self, history, chunk_tag): - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) history_df = pd.DataFrame.from_dict(history) @@ -418,7 +425,7 @@ def __init__(self, trace_label, chunk_size, baseline_rss, baseline_uss, headroom def audit(self, msg, bytes=0, rss=0, uss=0, from_rss_monitor=False): - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) MAX_OVERDRAFT = 0.2 @@ -483,7 +490,7 @@ def size_it(df): assert False return elements, bytes - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) if df is None: elements, bytes = (0, 0) @@ -511,7 +518,7 @@ def size_it(df): def check_local_hwm(self, hwm_trace_label, rss, uss, total_bytes): - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) from_rss_monitor = total_bytes is None @@ -563,6 +570,10 @@ def get_hwm_bytes(self): def log_rss(trace_label, force=False): + if chunk_training_mode() == MODE_CHUNKLESS: + # no memory tracing at all in chunkless mode + return + assert len(CHUNK_LEDGERS) > 0, f"log_rss called without current chunker." hwm_trace_label = f"{trace_label}.log_rss" @@ -584,7 +595,7 @@ def log_df(trace_label, table_name, df): assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker." - if chunk_training_mode() == MODE_PRODUCTION: + if chunk_training_mode() in (MODE_PRODUCTION, MODE_CHUNKLESS): return op = 'del' if df is None else 'add' @@ -679,7 +690,8 @@ def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0): def close(self): - if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and (chunk_training_mode() != MODE_PRODUCTION): + if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and \ + (chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)): _HISTORIAN.write_history(self.history, self.chunk_tag) _chunk_sizer = CHUNK_SIZERS.pop() @@ -826,7 +838,7 @@ def adaptive_rows_per_chunk(self, i): # input() - if chunk_training_mode() != MODE_PRODUCTION: + if chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS): self.cum_rows += self.rows_per_chunk return self.rows_per_chunk, estimated_number_of_chunks From 61c79071410a45996e4d1e2097c85d3b0c4faf19 Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Mon, 19 Jul 2021 12:07:30 -0500 Subject: [PATCH 2/6] more aggressive non-memory-logging --- activitysim/core/chunk.py | 61 +++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/activitysim/core/chunk.py b/activitysim/core/chunk.py index 628e1526ec..4ae26873c6 100644 --- a/activitysim/core/chunk.py +++ b/activitysim/core/chunk.py @@ -579,8 +579,9 @@ def log_rss(trace_label, force=False): hwm_trace_label = f"{trace_label}.log_rss" if chunk_training_mode() == MODE_PRODUCTION: - trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN - mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks) + # FIXME - this trace_memory_info call slows things down a lot so it is turned off for now + # trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN + # mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks) return rss, uss = mem.trace_memory_info(hwm_trace_label) @@ -593,11 +594,11 @@ def log_rss(trace_label, force=False): def log_df(trace_label, table_name, df): - assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker." - if chunk_training_mode() in (MODE_PRODUCTION, MODE_CHUNKLESS): return + assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker." + op = 'del' if df is None else 'add' hwm_trace_label = f"{trace_label}.{op}.{table_name}" @@ -635,19 +636,27 @@ class ChunkSizer(object): def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0): self.depth = len(CHUNK_SIZERS) + 1 - self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) - if self.depth > 1: - # nested chunkers should be unchunked - assert chunk_size == 0 + if chunk_training_mode() != MODE_CHUNKLESS: + if chunk_metric() == USS: + self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) + else: + self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False) + self.uss = 0 + + if self.depth > 1: + # nested chunkers should be unchunked + assert chunk_size == 0 - # if we are in a nested call, then we must be in the scope of active Ledger - # so any rss accumulated so far should be attributed to the parent active ledger - assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS) - parent = CHUNK_SIZERS[-1] - assert parent.chunk_ledger is not None + # if we are in a nested call, then we must be in the scope of active Ledger + # so any rss accumulated so far should be attributed to the parent active ledger + assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS) + parent = CHUNK_SIZERS[-1] + assert parent.chunk_ledger is not None - log_rss(trace_label) # give parent a complementary log_rss reading entering sub context + log_rss(trace_label) # give parent a complementary log_rss reading entering sub context + else: + self.rss, self.uss = 0, 0 self.chunk_tag = chunk_tag self.trace_label = trace_label @@ -767,7 +776,12 @@ def adaptive_rows_per_chunk(self, i): prev_rss = self.rss prev_uss = self.uss - self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) + + if chunk_metric() == USS: + self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) + else: + self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False) + self.uss = 0 self.headroom = self.available_headroom(self.uss if chunk_metric() == USS else self.rss) @@ -846,6 +860,11 @@ def adaptive_rows_per_chunk(self, i): @contextmanager def ledger(self): + # don't do anything in chunkless mode + if chunk_training_mode() == MODE_CHUNKLESS: + yield + return + mem_monitor = None # nested chunkers should be unchunked @@ -911,7 +930,8 @@ def chunk_log(trace_label, chunk_tag=None, base=False): yield - chunk_sizer.adaptive_rows_per_chunk(1) + if chunk_training_mode() != MODE_CHUNKLESS: + chunk_sizer.adaptive_rows_per_chunk(1) chunk_sizer.close() @@ -952,7 +972,8 @@ def adaptive_chunked_choosers(choosers, chunk_size, trace_label, chunk_tag=None) offset += rows_per_chunk - rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) + if chunk_training_mode() != MODE_CHUNKLESS: + rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) chunk_sizer.close() @@ -1046,7 +1067,8 @@ def adaptive_chunked_choosers_and_alts(choosers, alternatives, chunk_size, trace offset += rows_per_chunk alt_offset = alt_end - rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) + if chunk_training_mode() != MODE_CHUNKLESS: + rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) chunk_sizer.close() @@ -1086,6 +1108,7 @@ def adaptive_chunked_choosers_by_chunk_id(choosers, chunk_size, trace_label, chu offset += rows_per_chunk - rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) + if chunk_training_mode() != MODE_CHUNKLESS: + rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) chunk_sizer.close() From 425810fa3ddf61233857f93474398018f5d5fa83 Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Mon, 19 Jul 2021 14:57:50 -0500 Subject: [PATCH 3/6] fix tests for pandas 1.3 --- activitysim/core/config.py | 6 ++++++ activitysim/core/simulate.py | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/activitysim/core/config.py b/activitysim/core/config.py index 6d3dc89b49..b85a02e81a 100644 --- a/activitysim/core/config.py +++ b/activitysim/core/config.py @@ -553,6 +553,12 @@ def filter_warnings(): warnings.filterwarnings('ignore', category=DeprecationWarning, module='tables', message='`np.object` is a deprecated alias') + # beginning pandas version 1.3, various places emit a PerformanceWarning that is + # caught in the "strict" filter above, but which are currently unavoidable for complex models. + # These warning are left as warnings as an invitation for future enhancement. + from pandas.errors import PerformanceWarning + warnings.filterwarnings('default', category=PerformanceWarning) + def handle_standard_args(parser=None): diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 937e73c4e7..421cd4599e 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -229,7 +229,10 @@ def read_model_coefficient_template(model_settings): # this makes for a more legible template than repeating the identical coefficient name in each column # replace missing cell values with coefficient_name from index - template = template.where(~template.isnull(), template.index) + template = template.where( + ~template.isnull(), + np.broadcast_to(template.index.values[:,None], template.shape), + ) if template.index.duplicated().any(): dupes = template[template.index.duplicated(keep=False)].sort_index() From adcf6b1ae852955fb9fcf74c8056eb7ec1129085 Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Mon, 19 Jul 2021 15:04:46 -0500 Subject: [PATCH 4/6] pycodestyle --- activitysim/core/simulate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 421cd4599e..5d627f1ed6 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -231,7 +231,7 @@ def read_model_coefficient_template(model_settings): # replace missing cell values with coefficient_name from index template = template.where( ~template.isnull(), - np.broadcast_to(template.index.values[:,None], template.shape), + np.broadcast_to(template.index.values[:, None], template.shape), ) if template.index.duplicated().any(): From 3be1ae42bf9f61688c2acf3110b931697bb2adfb Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Mon, 19 Jul 2021 16:15:00 -0500 Subject: [PATCH 5/6] add chunkless test on mtc --- activitysim/examples/example_mtc/test/test_mtc.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/activitysim/examples/example_mtc/test/test_mtc.py b/activitysim/examples/example_mtc/test/test_mtc.py index 8eccb2c3ea..a81c439cbd 100644 --- a/activitysim/examples/example_mtc/test/test_mtc.py +++ b/activitysim/examples/example_mtc/test/test_mtc.py @@ -15,7 +15,7 @@ def teardown_function(func): inject.reinject_decorated_tables() -def run_test_mtc(multiprocess=False): +def run_test_mtc(multiprocess=False, chunkless=False): def example_path(dirname): resource = os.path.join('examples', 'example_mtc', dirname) @@ -38,6 +38,9 @@ def regress(): if multiprocess: run_args = ['-c', test_path('configs_mp'), '-c', example_path('configs_mp'), '-c', example_path('configs'), '-d', example_path('data'), '-o', test_path('output')] + elif chunkless: + run_args = ['-c', test_path('configs_chunkless'), '-c', example_path('configs'), + '-d', example_path('data'), '-o', test_path('output')] else: run_args = ['-c', test_path('configs'), '-c', example_path('configs'), '-d', example_path('data'), '-o', test_path('output')] @@ -51,6 +54,10 @@ def test_mtc(): run_test_mtc(multiprocess=False) +def test_mtc_chunkless(): + run_test_mtc(multiprocess=False, chunkless=True) + + def test_mtc_mp(): run_test_mtc(multiprocess=True) @@ -59,3 +66,4 @@ def test_mtc_mp(): run_test_mtc(multiprocess=False) run_test_mtc(multiprocess=True) + run_test_mtc(multiprocess=False, chunkless=True) From 37862e3d1a7da1ab6971bdecabc9c436460d1ff5 Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Tue, 20 Jul 2021 06:24:35 -0500 Subject: [PATCH 6/6] chunkless test files --- .../test/configs_chunkless/network_los.yaml | 6 +++++ .../test/configs_chunkless/settings.yaml | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 activitysim/examples/example_mtc/test/configs_chunkless/network_los.yaml create mode 100644 activitysim/examples/example_mtc/test/configs_chunkless/settings.yaml diff --git a/activitysim/examples/example_mtc/test/configs_chunkless/network_los.yaml b/activitysim/examples/example_mtc/test/configs_chunkless/network_los.yaml new file mode 100644 index 0000000000..1c4cd79daf --- /dev/null +++ b/activitysim/examples/example_mtc/test/configs_chunkless/network_los.yaml @@ -0,0 +1,6 @@ +inherit_settings: True + +# read cached skims (using numpy memmap) from output directory (memmap is faster than omx ) +read_skim_cache: False +# write memmapped cached skims to output directory after reading from omx, for use in subsequent runs +write_skim_cache: False diff --git a/activitysim/examples/example_mtc/test/configs_chunkless/settings.yaml b/activitysim/examples/example_mtc/test/configs_chunkless/settings.yaml new file mode 100644 index 0000000000..3e670aca94 --- /dev/null +++ b/activitysim/examples/example_mtc/test/configs_chunkless/settings.yaml @@ -0,0 +1,26 @@ +inherit_settings: True + +# treat warnings as errors +strict: True + +# number of households to simulate +households_sample_size: 10 +chunk_size: 0 +chunk_training_mode: disabled + +# - shadow pricing global switches +use_shadow_pricing: False + +# turn writing of sample_tables on and off for all models +# (if True, tables will be written if DEST_CHOICE_SAMPLE_TABLE_NAME is specified in individual model settings) +want_dest_choice_sample_tables: False + +cleanup_pipeline_after_run: True + +output_tables: + h5_store: False + action: include + prefix: final_ + sort: True + tables: + - trips