Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 65 additions & 30 deletions activitysim/core/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
MODE_RETRAIN = 'training'
MODE_ADAPTIVE = 'adaptive'
MODE_PRODUCTION = 'production'
TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION]
MODE_CHUNKLESS = 'disabled'
TRAINING_MODES = [MODE_RETRAIN, MODE_ADAPTIVE, MODE_PRODUCTION, MODE_CHUNKLESS]

#
# low level
Expand Down Expand Up @@ -135,7 +136,9 @@ def chunk_metric():
def chunk_training_mode():
training_mode = \
SETTINGS.setdefault('chunk_training_mode', config.setting('chunk_training_mode', MODE_ADAPTIVE))
assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode} not one of: {TRAINING_MODES}"
if not training_mode:
training_mode = MODE_CHUNKLESS
assert training_mode in TRAINING_MODES, f"chunk_training_mode '{training_mode}' not one of: {TRAINING_MODES}"
return training_mode


Expand Down Expand Up @@ -223,8 +226,9 @@ def consolidate_logs():
if not glob_files:
return

assert chunk_training_mode() != MODE_PRODUCTION, \
f"shouldn't be any chunk log files when chunk_training_mode is {MODE_PRODUCTION}"
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS), \
f"shouldn't be any chunk log files when chunk_training_mode" \
f" is {MODE_PRODUCTION} or {MODE_CHUNKLESS}"

#
# OMNIBUS_LOG_FILE
Expand Down Expand Up @@ -331,6 +335,9 @@ def load_cached_history(self):
else:
self.have_cached_history = False

if chunk_training_mode() == MODE_CHUNKLESS:
return

if chunk_training_mode() == MODE_PRODUCTION:
# raise RuntimeError(f"chunk_training_mode is {MODE_PRODUCTION} but no chunk_cache: {chunk_cache_path}")

Expand Down Expand Up @@ -379,7 +386,7 @@ def cached_row_size(self, chunk_tag):

def write_history(self, history, chunk_tag):

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

history_df = pd.DataFrame.from_dict(history)

Expand Down Expand Up @@ -418,7 +425,7 @@ def __init__(self, trace_label, chunk_size, baseline_rss, baseline_uss, headroom

def audit(self, msg, bytes=0, rss=0, uss=0, from_rss_monitor=False):

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

MAX_OVERDRAFT = 0.2

Expand Down Expand Up @@ -483,7 +490,7 @@ def size_it(df):
assert False
return elements, bytes

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

if df is None:
elements, bytes = (0, 0)
Expand Down Expand Up @@ -511,7 +518,7 @@ def size_it(df):

def check_local_hwm(self, hwm_trace_label, rss, uss, total_bytes):

assert chunk_training_mode() != MODE_PRODUCTION
assert chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)

from_rss_monitor = total_bytes is None

Expand Down Expand Up @@ -563,13 +570,18 @@ def get_hwm_bytes(self):

def log_rss(trace_label, force=False):

if chunk_training_mode() == MODE_CHUNKLESS:
# no memory tracing at all in chunkless mode
return

assert len(CHUNK_LEDGERS) > 0, f"log_rss called without current chunker."

hwm_trace_label = f"{trace_label}.log_rss"

if chunk_training_mode() == MODE_PRODUCTION:
trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN
mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks)
# FIXME - this trace_memory_info call slows things down a lot so it is turned off for now
# trace_ticks = 0 if force else mem.MEM_TRACE_TICK_LEN
# mem.trace_memory_info(hwm_trace_label, trace_ticks=trace_ticks)
return

rss, uss = mem.trace_memory_info(hwm_trace_label)
Expand All @@ -582,11 +594,11 @@ def log_rss(trace_label, force=False):

def log_df(trace_label, table_name, df):

assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker."

if chunk_training_mode() == MODE_PRODUCTION:
if chunk_training_mode() in (MODE_PRODUCTION, MODE_CHUNKLESS):
return

assert len(CHUNK_LEDGERS) > 0, f"log_df called without current chunker."

op = 'del' if df is None else 'add'
hwm_trace_label = f"{trace_label}.{op}.{table_name}"

Expand Down Expand Up @@ -624,19 +636,27 @@ class ChunkSizer(object):
def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0):

self.depth = len(CHUNK_SIZERS) + 1
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)

if self.depth > 1:
# nested chunkers should be unchunked
assert chunk_size == 0
if chunk_training_mode() != MODE_CHUNKLESS:
if chunk_metric() == USS:
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)
else:
self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False)
self.uss = 0

# if we are in a nested call, then we must be in the scope of active Ledger
# so any rss accumulated so far should be attributed to the parent active ledger
assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS)
parent = CHUNK_SIZERS[-1]
assert parent.chunk_ledger is not None
if self.depth > 1:
# nested chunkers should be unchunked
assert chunk_size == 0

log_rss(trace_label) # give parent a complementary log_rss reading entering sub context
# if we are in a nested call, then we must be in the scope of active Ledger
# so any rss accumulated so far should be attributed to the parent active ledger
assert len(CHUNK_SIZERS) == len(CHUNK_LEDGERS)
parent = CHUNK_SIZERS[-1]
assert parent.chunk_ledger is not None

log_rss(trace_label) # give parent a complementary log_rss reading entering sub context
else:
self.rss, self.uss = 0, 0

self.chunk_tag = chunk_tag
self.trace_label = trace_label
Expand Down Expand Up @@ -679,7 +699,8 @@ def __init__(self, chunk_tag, trace_label, num_choosers=0, chunk_size=0):

def close(self):

if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and (chunk_training_mode() != MODE_PRODUCTION):
if ((self.depth == 1) or WRITE_SUBCHUNK_HISTORY) and \
(chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS)):
_HISTORIAN.write_history(self.history, self.chunk_tag)

_chunk_sizer = CHUNK_SIZERS.pop()
Expand Down Expand Up @@ -755,7 +776,12 @@ def adaptive_rows_per_chunk(self, i):

prev_rss = self.rss
prev_uss = self.uss
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)

if chunk_metric() == USS:
self.rss, self.uss = mem.get_rss(force_garbage_collect=True, uss=True)
else:
self.rss, _ = mem.get_rss(force_garbage_collect=True, uss=False)
self.uss = 0

self.headroom = self.available_headroom(self.uss if chunk_metric() == USS else self.rss)

Expand Down Expand Up @@ -826,14 +852,19 @@ def adaptive_rows_per_chunk(self, i):

# input()

if chunk_training_mode() != MODE_PRODUCTION:
if chunk_training_mode() not in (MODE_PRODUCTION, MODE_CHUNKLESS):
self.cum_rows += self.rows_per_chunk

return self.rows_per_chunk, estimated_number_of_chunks

@contextmanager
def ledger(self):

# don't do anything in chunkless mode
if chunk_training_mode() == MODE_CHUNKLESS:
yield
return

mem_monitor = None

# nested chunkers should be unchunked
Expand Down Expand Up @@ -899,7 +930,8 @@ def chunk_log(trace_label, chunk_tag=None, base=False):

yield

chunk_sizer.adaptive_rows_per_chunk(1)
if chunk_training_mode() != MODE_CHUNKLESS:
chunk_sizer.adaptive_rows_per_chunk(1)

chunk_sizer.close()

Expand Down Expand Up @@ -940,7 +972,8 @@ def adaptive_chunked_choosers(choosers, chunk_size, trace_label, chunk_tag=None)

offset += rows_per_chunk

rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)
if chunk_training_mode() != MODE_CHUNKLESS:
rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)

chunk_sizer.close()

Expand Down Expand Up @@ -1034,7 +1067,8 @@ def adaptive_chunked_choosers_and_alts(choosers, alternatives, chunk_size, trace
offset += rows_per_chunk
alt_offset = alt_end

rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)
if chunk_training_mode() != MODE_CHUNKLESS:
rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)

chunk_sizer.close()

Expand Down Expand Up @@ -1074,6 +1108,7 @@ def adaptive_chunked_choosers_by_chunk_id(choosers, chunk_size, trace_label, chu

offset += rows_per_chunk

rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)
if chunk_training_mode() != MODE_CHUNKLESS:
rows_per_chunk, estimated_number_of_chunks = chunk_sizer.adaptive_rows_per_chunk(i)

chunk_sizer.close()
6 changes: 6 additions & 0 deletions activitysim/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,12 @@ def filter_warnings():
warnings.filterwarnings('ignore', category=DeprecationWarning, module='tables',
message='`np.object` is a deprecated alias')

# beginning pandas version 1.3, various places emit a PerformanceWarning that is
# caught in the "strict" filter above, but which are currently unavoidable for complex models.
# These warning are left as warnings as an invitation for future enhancement.
from pandas.errors import PerformanceWarning
warnings.filterwarnings('default', category=PerformanceWarning)


def handle_standard_args(parser=None):

Expand Down
5 changes: 4 additions & 1 deletion activitysim/core/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ def read_model_coefficient_template(model_settings):
# this makes for a more legible template than repeating the identical coefficient name in each column

# replace missing cell values with coefficient_name from index
template = template.where(~template.isnull(), template.index)
template = template.where(
~template.isnull(),
np.broadcast_to(template.index.values[:, None], template.shape),
)

if template.index.duplicated().any():
dupes = template[template.index.duplicated(keep=False)].sort_index()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
inherit_settings: True

# read cached skims (using numpy memmap) from output directory (memmap is faster than omx )
read_skim_cache: False
# write memmapped cached skims to output directory after reading from omx, for use in subsequent runs
write_skim_cache: False
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
inherit_settings: True

# treat warnings as errors
strict: True

# number of households to simulate
households_sample_size: 10
chunk_size: 0
chunk_training_mode: disabled

# - shadow pricing global switches
use_shadow_pricing: False

# turn writing of sample_tables on and off for all models
# (if True, tables will be written if DEST_CHOICE_SAMPLE_TABLE_NAME is specified in individual model settings)
want_dest_choice_sample_tables: False

cleanup_pipeline_after_run: True

output_tables:
h5_store: False
action: include
prefix: final_
sort: True
tables:
- trips
10 changes: 9 additions & 1 deletion activitysim/examples/example_mtc/test/test_mtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def teardown_function(func):
inject.reinject_decorated_tables()


def run_test_mtc(multiprocess=False):
def run_test_mtc(multiprocess=False, chunkless=False):

def example_path(dirname):
resource = os.path.join('examples', 'example_mtc', dirname)
Expand All @@ -38,6 +38,9 @@ def regress():
if multiprocess:
run_args = ['-c', test_path('configs_mp'), '-c', example_path('configs_mp'), '-c', example_path('configs'),
'-d', example_path('data'), '-o', test_path('output')]
elif chunkless:
run_args = ['-c', test_path('configs_chunkless'), '-c', example_path('configs'),
'-d', example_path('data'), '-o', test_path('output')]
else:
run_args = ['-c', test_path('configs'), '-c', example_path('configs'),
'-d', example_path('data'), '-o', test_path('output')]
Expand All @@ -51,6 +54,10 @@ def test_mtc():
run_test_mtc(multiprocess=False)


def test_mtc_chunkless():
run_test_mtc(multiprocess=False, chunkless=True)


def test_mtc_mp():
run_test_mtc(multiprocess=True)

Expand All @@ -59,3 +66,4 @@ def test_mtc_mp():

run_test_mtc(multiprocess=False)
run_test_mtc(multiprocess=True)
run_test_mtc(multiprocess=False, chunkless=True)