Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/source/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.10.12 (2025-XX-XX)
- Feat: Automatically check cache-validity of polars DataFrame tasks marked as lazy

## 0.10.11 (2025-09-08)
- Fix: Late initialization of ParquetTableCache instance_id allows use of multi-config `@input_stage_versions`

Expand Down
14 changes: 9 additions & 5 deletions docs/source/examples/realistic_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,18 @@ are `sqlalchemy.Table`, `pandas.DataFrame`, `polars.DataFrame`, or `polars.LazyF

### Controlling automatic cache invalidation

For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set lazy=True. This means the task is always
For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set `lazy=True`. This means the task is always
executed because producing a query is fast, but the query is only executed when it is actually needed. For
`pl.LazyFrame`, `version=AUTO_VERSION` is a good choice, because then the task is executed once with empty input
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data. For
`pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the
user needs to help manually bumpig a version number like `version="1.0.0"`. For development, `version=None` simply
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data.

For `pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the
user needs to help manually bumping a version number like `version="1.0.0"`. For development, `version=None` simply
deactivates caching until the code is more stable. It is recommended to always develop with small pipeline instances
anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)).
anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)). Setting `lazy=True` and `version=None`
for `pl.DataFrame` executes the task, but hashes the result to determine the cache-validity of the task output and hence
the cache invalidation of downstream tasks. This is a good choice for small tasks which are quick to compute and
where the bumping the version number adds unwanted complexity to the development process.

### Integration with pydiverse colspec (same as dataframely but with pydiverse transform based SQL support)

Expand Down
5 changes: 4 additions & 1 deletion docs/source/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ In this case, the task must produce a SQLAlchemy expression for
all tabular outputs without executing them. Pipedag can render the query and will only produce a table based on this
query expression if the query changed or one of the inputs to the task changed.

For tasks returning Polars DataFrame, the hash of the resulting DataFrame is used to determine whether to
cache-invalidate downstream tasks.

### Manual cache invalidation with `version` parameter

For non-SQL tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
For non-lazy tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
be used for manual cache invalidation. As long as the version stays the same, it is assumed that the code of the task
did not materially change and will produce the same outputs given the same inputs. We refrained from automatically
inspecting any python code changes since this would break at shared code changes where it is very hard to distinguish
Expand Down
13 changes: 12 additions & 1 deletion src/pydiverse/pipedag/backend/table/sql/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from pydiverse.common import Date, Dtype, PandasBackend
from pydiverse.common.util.computation_tracing import ComputationTracer
from pydiverse.common.util.hashing import stable_hash
from pydiverse.common.util.hashing import hash_polars_dataframe, stable_hash
from pydiverse.pipedag import ConfigContext
from pydiverse.pipedag._typing import T
from pydiverse.pipedag.backend.table.sql.ddl import (
Expand Down Expand Up @@ -568,6 +568,11 @@ def materialize(
def retrieve(cls, *args, **kwargs):
raise RuntimeError("This should never get called.")

@classmethod
def lazy_query_str(cls, store: SQLTableStore, obj: ExternalTableReference) -> str:
obj_hash = stable_hash(obj.name, obj.schema)
return obj_hash


# endregion

Expand Down Expand Up @@ -1239,6 +1244,12 @@ def dialect_supports_polars_native_read(cls):
# for most dialects we find a way
return True

@classmethod
def lazy_query_str(cls, store: SQLTableStore, obj: pl.DataFrame) -> str:
_ = store
obj_hash = hash_polars_dataframe(obj)
return obj_hash


@SQLTableStore.register_table(pl)
class LazyPolarsTableHook(TableHook[SQLTableStore]):
Expand Down
40 changes: 33 additions & 7 deletions src/pydiverse/pipedag/materialize/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,40 @@ def materialize(
:param lazy:
Whether this task is lazy or not.

Unlike a normal task, lazy tasks always get executed. However, if a lazy
task produces a lazy table (e.g. a SQL query), the table store checks if
the same query has been executed before. If this is the case, then the
query doesn't get executed, and instead, the table gets copied from the cache.
Unlike a normal task, lazy tasks always get executed. However, before table
returned by a lazy task gets materialized, the table store checks if
the same table has been materialized before. If this is the case, then the
table doesn't get materialized, and instead, the table gets copied from the cache.

This is efficient for tasks that return SQL queries, because the query
only gets generated but will not be executed again if the resulting table is cache-valid.

The same also works for :py:class:`ExternalTableReference <pydiverse.pipedag.container.ExternalTableReference>`,
where the "query" is just the identifier of the table in the store.

.. Note:: For tasks returning an ``ExternalTableReference`` pipedag cannot automatically
know if the external tables has changed of not. This should be controlled via a cache function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
know if the external tables has changed of not. This should be controlled via a cache function
know if the external tables have changed of not. This should be controlled via a cache function

given via the ``cache`` argument of ``materialize``.
See :py:class:`ExternalTableReference <pydiverse.pipedag.container.ExternalTableReference>`
for an example.


For tasks returning a Polars DataFrame, the output is deemed cache-valid
if the hash of the resulting DataFrame is the same as the hash of the previous run.
So, even though the task always gets executed, downstream tasks can remain cache-valid
if the DataFrame is the same as before. This is useful for small tasks that are hard to
implement using only LazyFrames, but where the DataFrame generation is cheap.



In both cases, you don't need to manually bump the ``version`` of a lazy task.

.. Warning:: A task returning a Polars LazyFrame should `not` be marked as lazy.
Use ``version=AUTO_VERSION`` instead. See :py:class:`AUTO_VERSION`.
.. Warning:: A task returning a Pandas DataFrame should `not` be marked as lazy.
No hashing is implemented for Pandas DataFrames, so the task will always
be deemed cache-invalid, and thus, cache-invalidate all downstream tasks.

This behaviour is very useful, because you don't need to manually bump
the `version` of a lazy task. This only works because for lazy tables
generating the query is very cheap compared to executing it.
:param group_node_tag:
Set a tag that may add this task to a configuration based group node.
:param nout:
Expand Down
33 changes: 17 additions & 16 deletions tests/test_cache/test_basic_cache_invalidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,22 @@ def test_change_task_version_blob(mocker):
child_spy.assert_called_once()


def test_change_lazy_query(mocker):
@pytest.mark.parametrize(
"get_tbl_obj",
[
lambda query_value: select_as(query_value, "x"),
(lambda query_value: pl.DataFrame({"x": [query_value]})) if pl else None,
],
ids=["sql", "polars"],
)
def test_change_lazy_query(mocker, get_tbl_obj):
if get_tbl_obj is None:
pytest.skip("Polars is not installed, skipping Polars test.")
query_value = 1

@materialize(lazy=True, nout=2)
def lazy_task():
return 0, Table(select_as(query_value, "x"), name="lazy_table")
return 0, Table(get_tbl_obj(query_value), name="lazy_table")

@materialize(input_type=pd.DataFrame, version="1.0")
def get_first(table, col):
Expand Down Expand Up @@ -991,11 +1001,10 @@ def test_ignore_task_version(mocker):
def test_lazy_table_without_query_string(mocker):
value = None

@materialize(lazy=True, nout=5)
@materialize(lazy=True, nout=4)
def falsely_lazy_task():
return (
Table(pd.DataFrame({"x": [value]}), name="pd_table"),
Table(pl.DataFrame({"x": [value]}), name="pl_table"),
Table(pl.DataFrame({"x": [value]}).lazy(), name="pl_lazy_table"),
select_as(2, "y"),
3,
Expand All @@ -1004,42 +1013,34 @@ def falsely_lazy_task():
def get_flow():
with Flow() as flow:
with Stage("stage_1"):
pd_tbl, pl_tbl, pl_lazy_tbl, select_tbl, constant = falsely_lazy_task()
pd_tbl, pl_lazy_tbl, select_tbl, constant = falsely_lazy_task()
res_pd = m.take_first(pd_tbl, as_int=True)
res_pl = m.take_first(pl_tbl, as_int=True)
res_pl_lazy = m.take_first(pl_lazy_tbl, as_int=True)
res_select = m.noop(select_tbl)
res_constant = m.noop(constant)
return flow, res_pd, res_pl, res_pl_lazy, res_select, res_constant
return flow, res_pd, res_pl_lazy, res_select, res_constant

value = 0
flow, res_pd, res_pl, res_pl_lazy, res_select, res_constant = get_flow()
flow, res_pd, res_pl_lazy, res_select, res_constant = get_flow()
with StageLockContext():
result = flow.run()
assert result.get(res_pd) == 0
assert result.get(res_pl) == 0
assert result.get(res_pl_lazy) == 0

value = 1
flow, res_pd, res_pl, res_pl_lazy, res_select, res_constant = get_flow()
flow, res_pd, res_pl_lazy, res_select, res_constant = get_flow()
res_pd_spy = spy_task(mocker, res_pd)
res_pl_spy = spy_task(mocker, res_pl)
res_pl_lazy_spy = spy_task(mocker, res_pl_lazy)
select_spy = spy_task(mocker, res_select)
constant_spy = spy_task(mocker, res_constant)
with StageLockContext():
result = flow.run()
assert result.get(res_pd) == 1
assert result.get(res_pl) == 1
assert result.get(res_pl_lazy) == 1
# res_pd is downstream of a pd.DataFrame from a lazy task,
# which should always be cache invalid. Hence, it should always be called.
res_pd_spy.assert_called_once()

# res_pd is downstream of a pl.DataFrame from a lazy task,
# which should always be cache invalid. Hence, it should always be called.
res_pl_spy.assert_called_once()

# res_pd is downstream of a pl.LazyFrame from a lazy task,
# which should always be cache invalid. Hence, it should always be called.
# To avoid cache-invalidating the LazyFrame, we should use AUTOVERSION.
Expand Down
Loading