Skip to content

Commit 4644293

Browse files
docs and ExternalTableReference
1 parent e6be3a7 commit 4644293

File tree

4 files changed

+53
-15
lines changed

4 files changed

+53
-15
lines changed

docs/source/examples/realistic_pipeline.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,14 +323,18 @@ are `sqlalchemy.Table`, `pandas.DataFrame`, `polars.DataFrame`, or `polars.LazyF
323323

324324
### Controlling automatic cache invalidation
325325

326-
For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set lazy=True. This means the task is always
326+
For input_type `sa.Table`, and `pdt.SqlAlchemy`, in general, it is best to set `lazy=True`. This means the task is always
327327
executed because producing a query is fast, but the query is only executed when it is actually needed. For
328328
`pl.LazyFrame`, `version=AUTO_VERSION` is a good choice, because then the task is executed once with empty input
329-
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data. For
330-
`pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the
331-
user needs to help manually bumpig a version number like `version="1.0.0"`. For development, `version=None` simply
329+
dataframes and only if resulting LazyFrame expressions change, the task is executed again with full input data.
330+
331+
For `pd.DataFrame` and `pl.DataFrame`, we don't try to guess which changes of the code are actually meaningful. Thus the
332+
user needs to help manually bumping a version number like `version="1.0.0"`. For development, `version=None` simply
332333
deactivates caching until the code is more stable. It is recommended to always develop with small pipeline instances
333-
anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)).
334+
anyways to achieve high iteration speed (see [multi_instance_pipeline.md](multi_instance_pipeline.md)). Setting `lazy=True` and `version=None`
335+
for `pl.DataFrame` executes the task, but hashes the result to determine the cache-validity of the task output and hence
336+
the cache invalidation of downstream tasks. This is a good choice for small tasks which are quick to compute and
337+
where the bumping the version number adds unwanted complexity to the development process.
334338

335339
### Integration with pydiverse colspec (same as dataframely but with pydiverse transform based SQL support)
336340

docs/source/quickstart.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,12 @@ In this case, the task must produce a SQLAlchemy expression for
187187
all tabular outputs without executing them. Pipedag can render the query and will only produce a table based on this
188188
query expression if the query changed or one of the inputs to the task changed.
189189

190+
For tasks returning Polars DataFrame, the hash of the resulting DataFrame is used to determine whether to
191+
cache-invalidate downstream tasks.
192+
190193
### Manual cache invalidation with `version` parameter
191194

192-
For non-SQL tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
195+
For non-lazy tasks, the `version` parameter of the {py:func}`@materialize <pydiverse.pipedag.materialize>` decorator must
193196
be used for manual cache invalidation. As long as the version stays the same, it is assumed that the code of the task
194197
did not materially change and will produce the same outputs given the same inputs. We refrained from automatically
195198
inspecting any python code changes since this would break at shared code changes where it is very hard to distinguish

src/pydiverse/pipedag/backend/table/sql/hooks.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,11 @@ def materialize(
568568
def retrieve(cls, *args, **kwargs):
569569
raise RuntimeError("This should never get called.")
570570

571+
@classmethod
572+
def lazy_query_str(cls, store: SQLTableStore, obj: ExternalTableReference) -> str:
573+
obj_hash = stable_hash(obj.name, obj.schema)
574+
return obj_hash
575+
571576

572577
# endregion
573578

@@ -1242,8 +1247,8 @@ def dialect_supports_polars_native_read(cls):
12421247
@classmethod
12431248
def lazy_query_str(cls, store: SQLTableStore, obj: pl.DataFrame) -> str:
12441249
_ = store
1245-
hash_of_df = hash_polars_dataframe(obj)
1246-
return hash_of_df
1250+
obj_hash = hash_polars_dataframe(obj)
1251+
return obj_hash
12471252

12481253

12491254
@SQLTableStore.register_table(pl)

src/pydiverse/pipedag/materialize/core.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,40 @@ def materialize(
111111
:param lazy:
112112
Whether this task is lazy or not.
113113
114-
Unlike a normal task, lazy tasks always get executed. However, if a lazy
115-
task produces a lazy table (e.g. a SQL query), the table store checks if
116-
the same query has been executed before. If this is the case, then the
117-
query doesn't get executed, and instead, the table gets copied from the cache.
114+
Unlike a normal task, lazy tasks always get executed. However, before table
115+
returned by a lazy task gets materialized, the table store checks if
116+
the same table has been materialized before. If this is the case, then the
117+
table doesn't get materialized, and instead, the table gets copied from the cache.
118+
119+
This is efficient for tasks that return SQL queries, because the query
120+
only gets generated but will not be executed again if the resulting table is cache-valid.
121+
122+
The same also works for :py:class:`ExternalTableReference <pydiverse.pipedag.container.ExternalTableReference>`,
123+
where the "query" is just the identifier of the table in the store.
124+
125+
.. Note:: For tasks returning an ``ExternalTableReference`` pipedag cannot automatically
126+
know if the external tables has changed of not. This should be controlled via a cache function
127+
given via the ``cache`` argument of ``materialize``.
128+
See :py:class:`ExternalTableReference <pydiverse.pipedag.container.ExternalTableReference>`
129+
for an example.
130+
131+
132+
For tasks returning a Polars DataFrame, the output is deemed cache-valid
133+
if the hash of the resulting DataFrame is the same as the hash of the previous run.
134+
So, even though the task always gets executed, downstream tasks can remain cache-valid
135+
if the DataFrame is the same as before. This is useful for small tasks that are hard to
136+
implement using only LazyFrames, but where the DataFrame generation is cheap.
137+
138+
139+
140+
In both cases, you don't need to manually bump the ``version`` of a lazy task.
141+
142+
.. Warning:: A task returning a Polars LazyFrame should `not` be marked as lazy.
143+
Use ``version=AUTO_VERSION`` instead. See :py:class:`AUTO_VERSION`.
144+
.. Warning:: A task returning a Pandas DataFrame should `not` be marked as lazy.
145+
No hashing is implemented for Pandas DataFrames, so the task will always
146+
be deemed cache-invalid, and thus, cache-invalidate all downstream tasks.
118147
119-
This behaviour is very useful, because you don't need to manually bump
120-
the `version` of a lazy task. This only works because for lazy tables
121-
generating the query is very cheap compared to executing it.
122148
:param group_node_tag:
123149
Set a tag that may add this task to a configuration based group node.
124150
:param nout:

0 commit comments

Comments
 (0)