Skip to content

Commit b931c5b

Browse files
docs and ExternalTableReference
1 parent b23db92 commit b931c5b

File tree

4 files changed

+53
-15
lines changed

4 files changed

+53
-15
lines changed

docs/source/examples/realistic_pipeline.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,14 +323,18 @@ are `sqlalchemy.Table`, `pandas.DataFrame`, `polars.DataFrame`, or `polars.LazyF
323323

324324
### Controlling automatic cache invalidation
325325

326-
For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set lazy=True. This means the task is always
326+
For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set `lazy=True`. This means the task is always
327327
executed because producing a query is fast, but the query is only executed when it is actually needed. For
328328
`pl.LazyFrame`, `version=AUTO_VERSION` is a good choice, because then the task is executed once with empty input
329-
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data. For
330-
`pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the
331-
user needs to help manually bumpig a version number like `version="1.0.0"`. For development, `version=None` simply
329+
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data.
330+
331+
For `pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the
332+
user needs to help manually bumping a version number like `version="1.0.0"`. For development, `version=None` simply
332333
deactivates caching until the code is more stable. It is recommended to always develop with small pipeline instances
333-
anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)).
334+
anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)). Setting `lazy=True` and `version=None`
335+
for `pl.DataFrame` executes the task, but hashes the result to determine the cache-validity of the task output and hence
336+
the cache invalidation of downstream tasks. This is a good choice for small tasks which are quick to compute and
337+
where the bumping the version number adds unwanted complexity to the development process.
334338

335339
### Integration with pydiverse colspec (same as dataframely but with pydiverse transform based SQL support)
336340

docs/source/quickstart.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,12 @@ database. Ideally, this is combined with `lazy=True`. In this case, the task mus
186186
all tabular outputs without executing them. Pipedag can render the query and will only produce a table based on this
187187
query expression if the query changed or one of the inputs to the task changed.
188188

189+
For tasks returning Polars DataFrame, the hash of the resulting DataFrame is used to determine whether to
190+
cache-invalidate downstream tasks.
191+
189192
### Manual cache invalidation with `version` parameter
190193

191-
For non-SQL tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
194+
For non-lazy tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
192195
be used for manual cache invalidation. As long as the version stays the same, it is assumed that the code of the task
193196
did not materially change and will produce the same outputs given the same inputs. We refrained from automatically
194197
inspecting any python code changes since this would break at shared code changes where it is very hard to distinguish

src/pydiverse/pipedag/backend/table/sql/hooks.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,11 @@ def materialize(
562562
def retrieve(cls, *args, **kwargs):
563563
raise RuntimeError("This should never get called.")
564564

565+
@classmethod
566+
def lazy_query_str(cls, store: SQLTableStore, obj: ExternalTableReference) -> str:
567+
obj_hash = stable_hash(obj.name, obj.schema)
568+
return obj_hash
569+
565570

566571
# endregion
567572

@@ -1214,8 +1219,8 @@ def dialect_supports_polars_native_read(cls):
12141219
@classmethod
12151220
def lazy_query_str(cls, store: SQLTableStore, obj: pl.DataFrame) -> str:
12161221
_ = store
1217-
hash_of_df = hash_polars_dataframe(obj)
1218-
return hash_of_df
1222+
obj_hash = hash_polars_dataframe(obj)
1223+
return obj_hash
12191224

12201225

12211226
@SQLTableStore.register_table(pl)

src/pydiverse/pipedag/materialize/core.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,40 @@ def materialize(
109109
:param lazy:
110110
Whether this task is lazy or not.
111111
112-
Unlike a normal task, lazy tasks always get executed. However, if a lazy
113-
task produces a lazy table (e.g. a SQL query), the table store checks if
114-
the same query has been executed before. If this is the case, then the
115-
query doesn't get executed, and instead, the table gets copied from the cache.
112+
Unlike a normal task, lazy tasks always get executed. However, before table
113+
returned by a lazy task gets materialized, the table store checks if
114+
the same table has been materialized before. If this is the case, then the
115+
table doesn't get materialized, and instead, the table gets copied from the cache.
116+
117+
This is efficient for tasks that return SQL queries, because the query
118+
only gets generated but will not be executed again if the resulting table is cache-valid.
119+
120+
The same also works for :py:class:`ExternalTableReference <pydiverse.pipedag.container.ExternalTableReference>`,
121+
where the "query" is just the identifier of the table in the store.
122+
123+
.. Note:: For tasks returning an ``ExternalTableReference`` pipedag cannot automatically
124+
know if the external tables has changed of not. This should be controlled via a cache function
125+
given via the ``cache`` argument of ``materialize``.
126+
See :py:class:`ExternalTableReference <pydiverse.pipedag.container.ExternalTableReference>`
127+
for an example.
128+
129+
130+
For tasks returning a Polars DataFrame, the output is deemed cache-valid
131+
if the hash of the resulting DataFrame is the same as the hash of the previous run.
132+
So, even though the task always gets executed, downstream tasks can remain cache-valid
133+
if the DataFrame is the same as before. This is useful for small tasks that are hard to
134+
implement using only LazyFrames, but where the DataFrame generation is cheap.
135+
136+
137+
138+
In both cases, you don't need to manually bump the ``version`` of a lazy task.
139+
140+
.. Warning:: A task returning a Polars LazyFrame should `not` be marked as lazy.
141+
Use ``version=AUTO_VERSION`` instead. See :py:class:`AUTO_VERSION`.
142+
.. Warning:: A task returning a Pandas DataFrame should `not` be marked as lazy.
143+
No hashing is implemented for Pandas DataFrames, so the task will always
144+
be deemed cache-invalid, and thus, cache-invalidate all downstream tasks.
116145
117-
This behaviour is very useful, because you don't need to manually bump
118-
the `version` of a lazy task. This only works because for lazy tables
119-
generating the query is very cheap compared to executing it.
120146
:param group_node_tag:
121147
Set a tag that may add this task to a configuration based group node.
122148
:param nout:

0 commit comments

Comments
 (0)