diff --git a/activitysim/core/pipeline.py b/activitysim/core/pipeline.py index fe3ceca456..9931fb3784 100644 --- a/activitysim/core/pipeline.py +++ b/activitysim/core/pipeline.py @@ -4,6 +4,7 @@ import logging import os from builtins import map, next, object +from pathlib import Path import pandas as pd from orca import orca @@ -37,7 +38,7 @@ class Pipeline(object): def __init__(self): self.init_state() - def init_state(self): + def init_state(self, pipeline_file_format="parquet"): # most recent checkpoint self.last_checkpoint = {} @@ -72,7 +73,7 @@ def is_open(): def is_readonly(): if is_open(): store = get_pipeline_store() - if store and store._mode == "r": + if store and not isinstance(store, Path) and store._mode == "r": return True return False @@ -99,7 +100,11 @@ def close_open_files(): def open_pipeline_store(overwrite=False, mode="a"): """ - Open the pipeline checkpoint store + Open the pipeline checkpoint store. + + If the pipeline_file_name setting ends in ".h5", then the pandas + HDFStore file format is used, otherwise pipeline files are stored + as parquet files organized in regular file system directories. Parameters ---------- @@ -125,23 +130,36 @@ def open_pipeline_store(overwrite=False, mode="a"): inject.get_injectable("pipeline_file_name") ) - if overwrite: - try: - if os.path.isfile(pipeline_file_path): - logger.debug("removing pipeline store: %s" % pipeline_file_path) - os.unlink(pipeline_file_path) - except Exception as e: - print(e) - logger.warning("Error removing %s: %s" % (pipeline_file_path, e)) + if pipeline_file_path.endswith(".h5"): + + if overwrite: + try: + if os.path.isfile(pipeline_file_path): + logger.debug("removing pipeline store: %s" % pipeline_file_path) + os.unlink(pipeline_file_path) + except Exception as e: + print(e) + logger.warning("Error removing %s: %s" % (pipeline_file_path, e)) - _PIPELINE.pipeline_store = pd.HDFStore(pipeline_file_path, mode=mode) + _PIPELINE.pipeline_store = pd.HDFStore(pipeline_file_path, mode=mode) + + else: + _PIPELINE.pipeline_store = Path(pipeline_file_path) logger.debug(f"opened pipeline_store {pipeline_file_path}") def get_pipeline_store(): """ - Return the open pipeline hdf5 checkpoint store or return None if it not been opened + Get the pipeline store. + + If the pipeline filename ends in ".h5" then the legacy HDF5 pipeline + is used, otherwise the faster parquet format is used, and the value + returned here is just the path to the pipeline directory. + + Returns + ------- + pd.HDFStore or Path """ return _PIPELINE.pipeline_store @@ -181,7 +199,12 @@ def read_df(table_name, checkpoint_name=None): """ store = get_pipeline_store() - df = store[pipeline_table_key(table_name, checkpoint_name)] + if isinstance(store, Path): + df = pd.read_parquet( + store.joinpath(table_name, f"{checkpoint_name}.parquet"), + ) + else: + df = store[pipeline_table_key(table_name, checkpoint_name)] return df @@ -193,7 +216,11 @@ def write_df(df, table_name, checkpoint_name=None): We store multiple versions of all simulation tables, for every checkpoint in which they change, so we need to know both the table_name and the checkpoint_name to label the saved table - The only exception is the checkpoints dataframe, which just has a table_name + The only exception is the checkpoints dataframe, which just has a table_name, + although when using the parquet storage format this file is stored as "None.parquet" + to maintain a simple consistent file directory structure. + + If the Parameters ---------- @@ -209,10 +236,28 @@ def write_df(df, table_name, checkpoint_name=None): df.columns = df.columns.astype(str) store = get_pipeline_store() - - store[pipeline_table_key(table_name, checkpoint_name)] = df - - store.flush() + if isinstance(store, Path): + store.joinpath(table_name).mkdir(parents=True, exist_ok=True) + df.to_parquet(store.joinpath(table_name, f"{checkpoint_name}.parquet")) + else: + complib = config.setting("pipeline_complib", None) + if complib is None or len(df.columns) == 0: + # tables with no columns can't be compressed successfully, so to + # avoid them getting just lost and dropped they are instead written + # in fixed format with no compression, which should be just fine + # since they have no data anyhow. + store.put( + pipeline_table_key(table_name, checkpoint_name), + df, + ) + else: + store.put( + pipeline_table_key(table_name, checkpoint_name), + df, + "table", + complib=complib, + ) + store.flush() def rewrap(table_name, df=None): @@ -615,7 +660,8 @@ def close_pipeline(): close_open_files() - _PIPELINE.pipeline_store.close() + if not isinstance(_PIPELINE.pipeline_store, Path): + _PIPELINE.pipeline_store.close() _PIPELINE.init_state() @@ -789,12 +835,20 @@ def get_checkpoints(): store = get_pipeline_store() if store is not None: - df = store[CHECKPOINT_TABLE_NAME] + if isinstance(store, Path): + df = pd.read_parquet(store.joinpath(CHECKPOINT_TABLE_NAME, "None.parquet")) + else: + df = store[CHECKPOINT_TABLE_NAME] else: pipeline_file_path = config.pipeline_file_path( orca.get_injectable("pipeline_file_name") ) - df = pd.read_hdf(pipeline_file_path, CHECKPOINT_TABLE_NAME) + if pipeline_file_path.endswith(".h5"): + df = pd.read_hdf(pipeline_file_path, CHECKPOINT_TABLE_NAME) + else: + df = pd.read_parquet( + Path(pipeline_file_path).joinpath(CHECKPOINT_TABLE_NAME, "None.parquet") + ) # non-table columns first (column order in df is random because created from a dict) table_names = [name for name in df.columns.values if name not in NON_TABLE_COLUMNS]