Skip to content

Commit ea62863

Browse files
authored
fix!: Runs should trigger materialization result (#50)
* chore: update uv.lock * chore: add pydantic as a dep * fix!: runs now trigger MaterializeResult A bug was found that if a plan was empty. Also adds more metadata for each asset * fix: remove some unnecessary logging * fix: update dagster_sqlmesh/resource.py * fix: more fixes * fix: remove unnecessary auto-execute
1 parent ac94c6e commit ea62863

File tree

6 files changed

+545
-89
lines changed

6 files changed

+545
-89
lines changed

dagster_sqlmesh/console.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from sqlmesh.core.console import Console
1111
from sqlmesh.core.context_diff import ContextDiff
1212
from sqlmesh.core.environment import EnvironmentNamingInfo
13-
from sqlmesh.core.plan import EvaluatablePlan, PlanBuilder
13+
from sqlmesh.core.plan import EvaluatablePlan, Plan as SQLMeshPlan, PlanBuilder
1414
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory, SnapshotInfoLike
1515
from sqlmesh.core.table_diff import RowDiff, SchemaDiff, TableDiff
1616
from sqlmesh.utils.concurrency import NodeExecutionFailedError
@@ -219,6 +219,10 @@ class PrintEnvironments(BaseConsoleEvent):
219219
class ShowTableDiffSummary(BaseConsoleEvent):
220220
table_diff: TableDiff
221221

222+
@dataclass(kw_only=True)
223+
class PlanBuilt(BaseConsoleEvent):
224+
plan: SQLMeshPlan
225+
222226
ConsoleEvent = (
223227
StartPlanEvaluation
224228
| StopPlanEvaluation
@@ -263,6 +267,7 @@ class ShowTableDiffSummary(BaseConsoleEvent):
263267
| ConsoleException
264268
| PrintEnvironments
265269
| ShowTableDiffSummary
270+
| PlanBuilt
266271
)
267272

268273
ConsoleEventHandler = t.Callable[[ConsoleEvent], None]
@@ -424,7 +429,22 @@ def add_handler(self, handler: ConsoleEventHandler) -> str:
424429

425430
def remove_handler(self, handler_id: str) -> None:
426431
del self._handlers[handler_id]
427-
432+
433+
def plan(self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: str | None, no_diff: bool = False, no_prompts: bool = False) -> None:
434+
"""Plan is not a console event. This triggers building of a plan and
435+
applying said plan
436+
437+
This method is called by SQLMesh to start the plan process (when you
438+
call Context#plan)
439+
440+
This overriden method ignores the options passed in at this time
441+
"""
442+
443+
plan_builder.apply()
444+
445+
def capture_built_plan(self, plan: SQLMeshPlan) -> None:
446+
"""Capture the built plan and publish a PlanBuilt event."""
447+
self.publish(PlanBuilt(plan=plan))
428448

429449
class EventConsole(IntrospectingConsole):
430450
"""

dagster_sqlmesh/controller/base.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlmesh.core.console import set_console
1010
from sqlmesh.core.context import Context
1111
from sqlmesh.core.model import Model
12-
from sqlmesh.core.plan import PlanBuilder
12+
from sqlmesh.core.plan import Plan as SQLMeshPlan, PlanBuilder
1313
from sqlmesh.utils.dag import DAG
1414
from sqlmesh.utils.date import TimeLike
1515

@@ -19,7 +19,6 @@
1919
ConsoleEventHandler,
2020
ConsoleException,
2121
EventConsole,
22-
Plan,
2322
SnapshotCategorizer,
2423
)
2524
from dagster_sqlmesh.events import ConsoleGenerator
@@ -179,16 +178,7 @@ def run_sqlmesh_thread(
179178
) -> None:
180179
logger.debug("dagster-sqlmesh: thread started")
181180

182-
def auto_execute_plan(event: ConsoleEvent):
183-
if isinstance(event, Plan):
184-
try:
185-
event.plan_builder.apply()
186-
except Exception as e:
187-
controller.console.exception(e)
188-
return None
189-
190181
try:
191-
controller.console.add_handler(auto_execute_plan)
192182
builder = t.cast(
193183
PlanBuilder,
194184
context.plan_builder(
@@ -381,6 +371,11 @@ def non_external_models_dag(self) -> t.Iterable[tuple[Model, set[str]]]:
381371
continue
382372
yield (model, deps)
383373

374+
375+
class ContextApplyFunction(t.Protocol):
376+
def __call__(self, plan: SQLMeshPlan, *args: t.Any, **kwargs: t.Any) -> None:
377+
...
378+
384379
class SQLMeshController(t.Generic[ContextCls]):
385380
"""Allows control of sqlmesh via a python interface. It is not suggested to
386381
use the constructor of this class directly, but instead use the provided
@@ -480,7 +475,21 @@ def _create_context(self) -> ContextCls:
480475
if self.config.sqlmesh_config:
481476
options["config"] = self.config.sqlmesh_config
482477
set_console(self.console)
483-
return self._context_factory(**options)
478+
context = self._context_factory(**options)
479+
480+
# As part of the context, it specifies a method "apply" that we would
481+
# like to introspect. To do so we replace "apply" with a wrapped
482+
# function that generates a special console event unique to
483+
# dagster-sqlmesh
484+
def wrap_apply_event(f: ContextApplyFunction) -> ContextApplyFunction:
485+
def wrapped_apply(plan: SQLMeshPlan, *args: t.Any, **kwargs: t.Any):
486+
self.logger.debug("capturing plan as event")
487+
self.console.capture_built_plan(plan)
488+
result = f(plan, *args, **kwargs)
489+
return result
490+
return wrapped_apply
491+
context.apply = wrap_apply_event(context.apply)
492+
return context
484493

485494
@contextmanager
486495
def instance(

0 commit comments

Comments
 (0)