Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions dagster_sqlmesh/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlmesh.core.console import Console
from sqlmesh.core.context_diff import ContextDiff
from sqlmesh.core.environment import EnvironmentNamingInfo
from sqlmesh.core.plan import EvaluatablePlan, PlanBuilder
from sqlmesh.core.plan import EvaluatablePlan, Plan as SQLMeshPlan, PlanBuilder
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory, SnapshotInfoLike
from sqlmesh.core.table_diff import RowDiff, SchemaDiff, TableDiff
from sqlmesh.utils.concurrency import NodeExecutionFailedError
Expand Down Expand Up @@ -219,6 +219,10 @@ class PrintEnvironments(BaseConsoleEvent):
class ShowTableDiffSummary(BaseConsoleEvent):
table_diff: TableDiff

@dataclass(kw_only=True)
class PlanBuilt(BaseConsoleEvent):
plan: SQLMeshPlan

ConsoleEvent = (
StartPlanEvaluation
| StopPlanEvaluation
Expand Down Expand Up @@ -263,6 +267,7 @@ class ShowTableDiffSummary(BaseConsoleEvent):
| ConsoleException
| PrintEnvironments
| ShowTableDiffSummary
| PlanBuilt
)

ConsoleEventHandler = t.Callable[[ConsoleEvent], None]
Expand Down Expand Up @@ -424,7 +429,22 @@ def add_handler(self, handler: ConsoleEventHandler) -> str:

def remove_handler(self, handler_id: str) -> None:
del self._handlers[handler_id]


def plan(self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: str | None, no_diff: bool = False, no_prompts: bool = False) -> None:
"""Plan is not a console event. This triggers building of a plan and
applying said plan

This method is called by SQLMesh to start the plan process (when you
call Context#plan)

This overriden method ignores the options passed in at this time
"""

plan_builder.apply()

def capture_built_plan(self, plan: SQLMeshPlan) -> None:
"""Capture the built plan and publish a PlanBuilt event."""
self.publish(PlanBuilt(plan=plan))

class EventConsole(IntrospectingConsole):
"""
Expand Down
33 changes: 21 additions & 12 deletions dagster_sqlmesh/controller/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlmesh.core.console import set_console
from sqlmesh.core.context import Context
from sqlmesh.core.model import Model
from sqlmesh.core.plan import PlanBuilder
from sqlmesh.core.plan import Plan as SQLMeshPlan, PlanBuilder
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.date import TimeLike

Expand All @@ -19,7 +19,6 @@
ConsoleEventHandler,
ConsoleException,
EventConsole,
Plan,
SnapshotCategorizer,
)
from dagster_sqlmesh.events import ConsoleGenerator
Expand Down Expand Up @@ -179,16 +178,7 @@ def run_sqlmesh_thread(
) -> None:
logger.debug("dagster-sqlmesh: thread started")

def auto_execute_plan(event: ConsoleEvent):
if isinstance(event, Plan):
try:
event.plan_builder.apply()
except Exception as e:
controller.console.exception(e)
return None

try:
controller.console.add_handler(auto_execute_plan)
builder = t.cast(
PlanBuilder,
context.plan_builder(
Expand Down Expand Up @@ -381,6 +371,11 @@ def non_external_models_dag(self) -> t.Iterable[tuple[Model, set[str]]]:
continue
yield (model, deps)


class ContextApplyFunction(t.Protocol):
def __call__(self, plan: SQLMeshPlan, *args: t.Any, **kwargs: t.Any) -> None:
...

class SQLMeshController(t.Generic[ContextCls]):
"""Allows control of sqlmesh via a python interface. It is not suggested to
use the constructor of this class directly, but instead use the provided
Expand Down Expand Up @@ -480,7 +475,21 @@ def _create_context(self) -> ContextCls:
if self.config.sqlmesh_config:
options["config"] = self.config.sqlmesh_config
set_console(self.console)
return self._context_factory(**options)
context = self._context_factory(**options)

# As part of the context, it specifies a method "apply" that we would
# like to introspect. To do so we replace "apply" with a wrapped
# function that generates a special console event unique to
# dagster-sqlmesh
def wrap_apply_event(f: ContextApplyFunction) -> ContextApplyFunction:
def wrapped_apply(plan: SQLMeshPlan, *args: t.Any, **kwargs: t.Any):
self.logger.debug("capturing plan as event")
self.console.capture_built_plan(plan)
result = f(plan, *args, **kwargs)
return result
return wrapped_apply
context.apply = wrap_apply_event(context.apply)
return context

@contextmanager
def instance(
Expand Down
Loading