Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dagster_sqlmesh/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def sqlmesh_assets(
op_tags: t.Optional[t.Mapping[str, t.Any]] = None,
required_resource_keys: t.Optional[t.Set[str]] = None,
retry_policy: t.Optional[RetryPolicy] = None,
# For now we don't set this by default
enabled_subsetting: bool = False,
):
controller = DagsterSQLMeshController.setup_with_config(config)
if not dagster_sqlmesh_translator:
Expand All @@ -39,5 +41,6 @@ def sqlmesh_assets(
op_tags=op_tags,
compute_kind=compute_kind,
retry_policy=retry_policy,
can_subset=enabled_subsetting,
required_resource_keys=required_resource_keys,
)
3 changes: 3 additions & 0 deletions dagster_sqlmesh/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def __init__(self, log_override: t.Optional[logging.Logger] = None):
self._handlers: Dict[str, ConsoleEventHandler] = {}
self.logger = log_override or logger
self.id = str(uuid.uuid4())
self.logger.debug(f"EventConsole[{self.id}]: created")
self.categorizer = None

def add_snapshot_categorizer(self, categorizer: SnapshotCategorizer):
Expand Down Expand Up @@ -459,7 +460,9 @@ def plan(
no_diff: bool = False,
no_prompts: bool = False,
) -> None:
self.logger.debug("building plan created")
plan = plan_builder.build()
self.logger.debug(f"plan created: {plan}")

for snapshot in plan.uncategorized:
if self.categorizer:
Expand Down
20 changes: 18 additions & 2 deletions dagster_sqlmesh/controller/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def run_sqlmesh_thread(
plan_options: PlanOptions,
default_catalog: str,
):
logger.debug("dagster-sqlmesh: thread started")
try:
builder = t.cast(
PlanBuilder,
Expand All @@ -180,13 +181,16 @@ def run_sqlmesh_thread(
)
except Exception as e:
controller.console.exception(e)
except: # noqa: E722
controller.console.exception(Exception("Unknown error during plan"))

generator = ConsoleGenerator(self.logger)

if categorizer:
self.console.add_snapshot_categorizer(categorizer)

with self.console_context(generator):
self.logger.debug("starting sqlmesh plan thread")
thread = threading.Thread(
target=run_sqlmesh_thread,
args=(
Expand All @@ -200,6 +204,7 @@ def run_sqlmesh_thread(
)
thread.start()

self.logger.debug("waiting for events")
for event in generator.events(thread):
match event:
case ConsoleException(e):
Expand Down Expand Up @@ -243,6 +248,8 @@ def run_sqlmesh_thread(
context.run(environment=environment, **run_options)
except Exception as e:
controller.console.exception(e)
except: # noqa: E722
controller.console.exception(Exception("Unknown error during plan"))

generator = ConsoleGenerator(self.logger)
with self.console_context(generator):
Expand Down Expand Up @@ -277,8 +284,17 @@ def plan_and_run(
run_options = run_options or {}
plan_options = plan_options or {}

yield from self.plan(categorizer, default_catalog, **plan_options)
yield from self.run(**run_options)
try:
self.logger.debug("starting sqlmesh plan")
yield from self.plan(categorizer, default_catalog, **plan_options)
self.logger.debug("starting sqlmesh run")
yield from self.run(**run_options)
except Exception as e:
self.logger.error(f"Error during sqlmesh plan and run: {e}")
raise e
except:
self.logger.error("Error during sqlmesh plan and run")
raise

def models(self):
return self.context.models
Expand Down
2 changes: 1 addition & 1 deletion dagster_sqlmesh/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def events(self, thread: threading.Thread) -> Iterator[console.ConsoleEvent]:
while thread.is_alive() or not self._queue.empty():
try:
# Get arguments from the queue with a timeout
args = self._queue.get(timeout=0.1)
args = self._queue.get(timeout=0.5)
yield args
except queue.Empty:
continue
Expand Down
48 changes: 30 additions & 18 deletions dagster_sqlmesh/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,27 @@ def process_events(
notify = self._tracker.notify_queue_next()
while notify is not None:
completed_name, update_status = notify

# If the model is not in the context, we can skip any notification
# This will happen for external models
if not sqlmesh_context.get_model(completed_name):
notify = self._tracker.notify_queue_next()
continue
model = self._models_map[completed_name]
output_key = sqlmesh_model_name_to_key(model.name)
asset_key = self._context.asset_key_for_output(output_key)
# asset_key = translator.get_asset_key_from_model(
# controller.context, model
# )
yield MaterializeResult(
asset_key=asset_key,
metadata={
"updated": update_status,
"duration_ms": 0,
},
)

model = self._models_map.get(completed_name)

# We allow selecting models. That value is mapped to models_map.
# If the model is not in models_map, we can skip any notification
if model:
output_key = sqlmesh_model_name_to_key(model.name)
asset_key = self._context.asset_key_for_output(output_key)
yield MaterializeResult(
asset_key=asset_key,
metadata={
"updated": update_status,
"duration_ms": 0,
},
)
notify = self._tracker.notify_queue_next()

def report_event(self, event: console.ConsoleEvent):
Expand Down Expand Up @@ -189,14 +194,14 @@ def report_event(self, event: console.ConsoleEvent):
raise Exception("sqlmesh failed during run")
case console.LogError(message):
log_context.error(
message,
f"sqlmesh reported an error: {message}",
)
case console.LogFailedModels(models):
log_context.error(
"\n".join(
if len(models) != 0:
failed_models = "\n".join(
[f"{str(model)}\n{str(model.__cause__)}" for model in models]
),
)
)
log_context.error(f"sqlmesh failed models: {failed_models}")
case _:
log_context.debug("Received event")

Expand Down Expand Up @@ -244,6 +249,8 @@ def run(
dag = mesh.models_dag()

plan_options["select_models"] = []
plan_options["backfill_models"] = []
run_options["select_models"] = []

models = mesh.models()
models_map = models.copy()
Expand All @@ -254,7 +261,12 @@ def run(
sqlmesh_model_name_to_key(model.name)
in context.selected_output_names
):
logger.info(f"selected model: {model.name}")

models_map[key] = model
plan_options["select_models"].append(model.name)
plan_options["backfill_models"].append(model.name)
run_options["select_models"].append(model.name)

event_handler = DagsterSQLMeshEventHandler(
context, models_map, dag, "sqlmesh: "
Expand Down
2 changes: 1 addition & 1 deletion sample/dagster_project/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_source() -> pl.DataFrame:
)


@sqlmesh_assets(environment="dev", config=sqlmesh_config)
@sqlmesh_assets(environment="dev", config=sqlmesh_config, enabled_subsetting=True)
def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
yield from sqlmesh.run(context)

Expand Down