diff --git a/activitysim/core/chunk.py b/activitysim/core/chunk.py index 252b780e19..4ae26873c6 100644 --- a/activitysim/core/chunk.py +++ b/activitysim/core/chunk.py @@ -74,7 +74,8 @@ MODE_RETRAIN = 'training' MODE_ADAPTIVE = 'adaptive' MODE_PRODUCTION = 'production' -TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION] +MODE_CHUNKLESS = 'disabled' +TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION, MODE_CHUNKLESS] # # low level @@ -135,7 +136,9 @@ def chunk_metric(): def chunk_training_mode(): training_mode = \ SETTINGS.setdefault('chunk_training_mode', config.setting('chunk_training_mode', MODE_ADAPTIVE)) - assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode} not one of: {TRAINING_MODES}" + if not training_mode: + training_mode = MODE_CHUNKLESS + assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode}' not one of: {TRAINING_MODES}" return training_mode @@ -223,8 +226,9 @@ def consolidate_logs(): if not glob_files: return - assert chunk_training_mode() != MODE_PRODUCTION, \ - f"shouldn't be any chunk log files when chunk_training_mode is {MODE_PRODUCTION}" + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS), \ + f"shouldn't be any chunk log files when chunk_training_mode" \ + f" is {MODE_PRODUCTION} or {MODE_CHUNKLESS}" # # OMNIBUS_LOG_FILE @@ -331,6 +335,9 @@ def load_cached_history(self): else: self.have_cached_history = False + if chunk_training_mode() == MODE_CHUNKLESS: + return + if chunk_training_mode() == MODE_PRODUCTION: # raise RuntimeError(f"chunk_training_mode is {MODE_PRODUCTION} but no chunk_cache: {chunk_cache_path}") @@ -379,7 +386,7 @@ def cached_row_size(self, chunk_tag): def write_history(self, history, chunk_tag): - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) history_df = pd.DataFrame.from_dict(history) @@ -418,7 +425,7 @@ def __init__(self, trace_label, chunk_size, baseline_rss, baseline_uss, headroom def audit(self, msg, bytes=0, rss=0, uss=0, from_rss_monitor=False): - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) MAX_OVERDRAFT = 0.2 @@ -483,7 +490,7 @@ def size_it(df): assert False return elements, bytes - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) if df is None: elements, bytes = (0, 0) @@ -511,7 +518,7 @@ def size_it(df): def check_local_hwm(self, hwm_trace_label, rss, uss, total_bytes): - assert chunk_training_mode() != MODE_PRODUCTION + assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS) from_rss_monitor = total_bytes is None @@ -563,13 +570,18 @@ def get_hwm_bytes(self): def log_rss(trace_label, force=False): + if chunk_training_mode() == MODE_CHUNKLESS: + # no memory tracing at all in chunkless mode + return + assert len(CHUNK_LEDGERS) > 0, f"log_rss called without current chunker." hwm_trace_label = f"{trace_label}.log_rss" if chunk_training_mode() == MODE_PRODUCTION: - trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN - mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks) + # FIXME - this trace_memory_info call slows things down a lot so it is turned off for now + # trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN + # mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks) return rss, uss = mem.trace_memory_info(hwm_trace_label) @@ -582,11 +594,11 @@ def log_rss(trace_label, force=False): def log_df(trace_label, table_name, df): - assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker." - - if chunk_training_mode() == MODE_PRODUCTION: + if chunk_training_mode() in (MODE_PRODUCTION, MODE_CHUNKLESS): return + assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker." + op = 'del' if df is None else 'add' hwm_trace_label = f"{trace_label}.{op}.{table_name}" @@ -624,19 +636,27 @@ class ChunkSizer(object): def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0): self.depth = len(CHUNK_SIZERS) + 1 - self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) - if self.depth > 1: - # nested chunkers should be unchunked - assert chunk_size == 0 + if chunk_training_mode() != MODE_CHUNKLESS: + if chunk_metric() == USS: + self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) + else: + self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False) + self.uss = 0 - # if we are in a nested call, then we must be in the scope of active Ledger - # so any rss accumulated so far should be attributed to the parent active ledger - assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS) - parent = CHUNK_SIZERS[-1] - assert parent.chunk_ledger is not None + if self.depth > 1: + # nested chunkers should be unchunked + assert chunk_size == 0 - log_rss(trace_label) # give parent a complementary log_rss reading entering sub context + # if we are in a nested call, then we must be in the scope of active Ledger + # so any rss accumulated so far should be attributed to the parent active ledger + assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS) + parent = CHUNK_SIZERS[-1] + assert parent.chunk_ledger is not None + + log_rss(trace_label) # give parent a complementary log_rss reading entering sub context + else: + self.rss, self.uss = 0, 0 self.chunk_tag = chunk_tag self.trace_label = trace_label @@ -679,7 +699,8 @@ def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0): def close(self): - if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and (chunk_training_mode() != MODE_PRODUCTION): + if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and \ + (chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)): _HISTORIAN.write_history(self.history, self.chunk_tag) _chunk_sizer = CHUNK_SIZERS.pop() @@ -755,7 +776,12 @@ def adaptive_rows_per_chunk(self, i): prev_rss = self.rss prev_uss = self.uss - self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) + + if chunk_metric() == USS: + self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True) + else: + self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False) + self.uss = 0 self.headroom = self.available_headroom(self.uss if chunk_metric() == USS else self.rss) @@ -826,7 +852,7 @@ def adaptive_rows_per_chunk(self, i): # input() - if chunk_training_mode() != MODE_PRODUCTION: + if chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS): self.cum_rows += self.rows_per_chunk return self.rows_per_chunk, estimated_number_of_chunks @@ -834,6 +860,11 @@ def adaptive_rows_per_chunk(self, i): @contextmanager def ledger(self): + # don't do anything in chunkless mode + if chunk_training_mode() == MODE_CHUNKLESS: + yield + return + mem_monitor = None # nested chunkers should be unchunked @@ -899,7 +930,8 @@ def chunk_log(trace_label, chunk_tag=None, base=False): yield - chunk_sizer.adaptive_rows_per_chunk(1) + if chunk_training_mode() != MODE_CHUNKLESS: + chunk_sizer.adaptive_rows_per_chunk(1) chunk_sizer.close() @@ -940,7 +972,8 @@ def adaptive_chunked_choosers(choosers, chunk_size, trace_label, chunk_tag=None) offset += rows_per_chunk - rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) + if chunk_training_mode() != MODE_CHUNKLESS: + rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) chunk_sizer.close() @@ -1034,7 +1067,8 @@ def adaptive_chunked_choosers_and_alts(choosers, alternatives, chunk_size, trace offset += rows_per_chunk alt_offset = alt_end - rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) + if chunk_training_mode() != MODE_CHUNKLESS: + rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) chunk_sizer.close() @@ -1074,6 +1108,7 @@ def adaptive_chunked_choosers_by_chunk_id(choosers, chunk_size, trace_label, chu offset += rows_per_chunk - rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) + if chunk_training_mode() != MODE_CHUNKLESS: + rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i) chunk_sizer.close() diff --git a/activitysim/core/config.py b/activitysim/core/config.py index 6d3dc89b49..b85a02e81a 100644 --- a/activitysim/core/config.py +++ b/activitysim/core/config.py @@ -553,6 +553,12 @@ def filter_warnings(): warnings.filterwarnings('ignore', category=DeprecationWarning, module='tables', message='`np.object` is a deprecated alias') + # beginning pandas version 1.3, various places emit a PerformanceWarning that is + # caught in the "strict" filter above, but which are currently unavoidable for complex models. + # These warning are left as warnings as an invitation for future enhancement. + from pandas.errors import PerformanceWarning + warnings.filterwarnings('default', category=PerformanceWarning) + def handle_standard_args(parser=None): diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 937e73c4e7..5d627f1ed6 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -229,7 +229,10 @@ def read_model_coefficient_template(model_settings): # this makes for a more legible template than repeating the identical coefficient name in each column # replace missing cell values with coefficient_name from index - template = template.where(~template.isnull(), template.index) + template = template.where( + ~template.isnull(), + np.broadcast_to(template.index.values[:, None], template.shape), + ) if template.index.duplicated().any(): dupes = template[template.index.duplicated(keep=False)].sort_index() diff --git a/activitysim/examples/example_mtc/test/configs_chunkless/network_los.yaml b/activitysim/examples/example_mtc/test/configs_chunkless/network_los.yaml new file mode 100644 index 0000000000..1c4cd79daf --- /dev/null +++ b/activitysim/examples/example_mtc/test/configs_chunkless/network_los.yaml @@ -0,0 +1,6 @@ +inherit_settings: True + +# read cached skims (using numpy memmap) from output directory (memmap is faster than omx ) +read_skim_cache: False +# write memmapped cached skims to output directory after reading from omx, for use in subsequent runs +write_skim_cache: False diff --git a/activitysim/examples/example_mtc/test/configs_chunkless/settings.yaml b/activitysim/examples/example_mtc/test/configs_chunkless/settings.yaml new file mode 100644 index 0000000000..3e670aca94 --- /dev/null +++ b/activitysim/examples/example_mtc/test/configs_chunkless/settings.yaml @@ -0,0 +1,26 @@ +inherit_settings: True + +# treat warnings as errors +strict: True + +# number of households to simulate +households_sample_size: 10 +chunk_size: 0 +chunk_training_mode: disabled + +# - shadow pricing global switches +use_shadow_pricing: False + +# turn writing of sample_tables on and off for all models +# (if True, tables will be written if DEST_CHOICE_SAMPLE_TABLE_NAME is specified in individual model settings) +want_dest_choice_sample_tables: False + +cleanup_pipeline_after_run: True + +output_tables: + h5_store: False + action: include + prefix: final_ + sort: True + tables: + - trips diff --git a/activitysim/examples/example_mtc/test/test_mtc.py b/activitysim/examples/example_mtc/test/test_mtc.py index 8eccb2c3ea..a81c439cbd 100644 --- a/activitysim/examples/example_mtc/test/test_mtc.py +++ b/activitysim/examples/example_mtc/test/test_mtc.py @@ -15,7 +15,7 @@ def teardown_function(func): inject.reinject_decorated_tables() -def run_test_mtc(multiprocess=False): +def run_test_mtc(multiprocess=False, chunkless=False): def example_path(dirname): resource = os.path.join('examples', 'example_mtc', dirname) @@ -38,6 +38,9 @@ def regress(): if multiprocess: run_args = ['-c', test_path('configs_mp'), '-c', example_path('configs_mp'), '-c', example_path('configs'), '-d', example_path('data'), '-o', test_path('output')] + elif chunkless: + run_args = ['-c', test_path('configs_chunkless'), '-c', example_path('configs'), + '-d', example_path('data'), '-o', test_path('output')] else: run_args = ['-c', test_path('configs'), '-c', example_path('configs'), '-d', example_path('data'), '-o', test_path('output')] @@ -51,6 +54,10 @@ def test_mtc(): run_test_mtc(multiprocess=False) +def test_mtc_chunkless(): + run_test_mtc(multiprocess=False, chunkless=True) + + def test_mtc_mp(): run_test_mtc(multiprocess=True) @@ -59,3 +66,4 @@ def test_mtc_mp(): run_test_mtc(multiprocess=False) run_test_mtc(multiprocess=True) + run_test_mtc(multiprocess=False, chunkless=True)