diff --git a/dagster_sqlmesh/asset.py b/dagster_sqlmesh/asset.py index 586c9ba..586fe27 100644 --- a/dagster_sqlmesh/asset.py +++ b/dagster_sqlmesh/asset.py @@ -25,6 +25,8 @@ def sqlmesh_assets( op_tags: t.Optional[t.Mapping[str, t.Any]] = None, required_resource_keys: t.Optional[t.Set[str]] = None, retry_policy: t.Optional[RetryPolicy] = None, + # For now we don't set this by default + enabled_subsetting: bool = False, ): controller = DagsterSQLMeshController.setup_with_config(config) if not dagster_sqlmesh_translator: @@ -39,5 +41,6 @@ def sqlmesh_assets( op_tags=op_tags, compute_kind=compute_kind, retry_policy=retry_policy, + can_subset=enabled_subsetting, required_resource_keys=required_resource_keys, ) diff --git a/dagster_sqlmesh/console.py b/dagster_sqlmesh/console.py index 3ca0c49..7ee1ce2 100644 --- a/dagster_sqlmesh/console.py +++ b/dagster_sqlmesh/console.py @@ -335,6 +335,7 @@ def __init__(self, log_override: t.Optional[logging.Logger] = None): self._handlers: Dict[str, ConsoleEventHandler] = {} self.logger = log_override or logger self.id = str(uuid.uuid4()) + self.logger.debug(f"EventConsole[{self.id}]: created") self.categorizer = None def add_snapshot_categorizer(self, categorizer: SnapshotCategorizer): @@ -459,7 +460,9 @@ def plan( no_diff: bool = False, no_prompts: bool = False, ) -> None: + self.logger.debug("building plan created") plan = plan_builder.build() + self.logger.debug(f"plan created: {plan}") for snapshot in plan.uncategorized: if self.categorizer: diff --git a/dagster_sqlmesh/controller/base.py b/dagster_sqlmesh/controller/base.py index 05ecd0a..eb9aeac 100644 --- a/dagster_sqlmesh/controller/base.py +++ b/dagster_sqlmesh/controller/base.py @@ -164,6 +164,7 @@ def run_sqlmesh_thread( plan_options: PlanOptions, default_catalog: str, ): + logger.debug("dagster-sqlmesh: thread started") try: builder = t.cast( PlanBuilder, @@ -180,6 +181,8 @@ def run_sqlmesh_thread( ) except Exception as e: controller.console.exception(e) + except: # noqa: E722 + controller.console.exception(Exception("Unknown error during plan")) generator = ConsoleGenerator(self.logger) @@ -187,6 +190,7 @@ def run_sqlmesh_thread( self.console.add_snapshot_categorizer(categorizer) with self.console_context(generator): + self.logger.debug("starting sqlmesh plan thread") thread = threading.Thread( target=run_sqlmesh_thread, args=( @@ -200,6 +204,7 @@ def run_sqlmesh_thread( ) thread.start() + self.logger.debug("waiting for events") for event in generator.events(thread): match event: case ConsoleException(e): @@ -243,6 +248,8 @@ def run_sqlmesh_thread( context.run(environment=environment, **run_options) except Exception as e: controller.console.exception(e) + except: # noqa: E722 + controller.console.exception(Exception("Unknown error during plan")) generator = ConsoleGenerator(self.logger) with self.console_context(generator): @@ -277,8 +284,17 @@ def plan_and_run( run_options = run_options or {} plan_options = plan_options or {} - yield from self.plan(categorizer, default_catalog, **plan_options) - yield from self.run(**run_options) + try: + self.logger.debug("starting sqlmesh plan") + yield from self.plan(categorizer, default_catalog, **plan_options) + self.logger.debug("starting sqlmesh run") + yield from self.run(**run_options) + except Exception as e: + self.logger.error(f"Error during sqlmesh plan and run: {e}") + raise e + except: + self.logger.error("Error during sqlmesh plan and run") + raise def models(self): return self.context.models diff --git a/dagster_sqlmesh/events.py b/dagster_sqlmesh/events.py index a322bc2..48430e5 100644 --- a/dagster_sqlmesh/events.py +++ b/dagster_sqlmesh/events.py @@ -73,7 +73,7 @@ def events(self, thread: threading.Thread) -> Iterator[console.ConsoleEvent]: while thread.is_alive() or not self._queue.empty(): try: # Get arguments from the queue with a timeout - args = self._queue.get(timeout=0.1) + args = self._queue.get(timeout=0.5) yield args except queue.Empty: continue diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index d9a6988..316d078 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -120,22 +120,27 @@ def process_events( notify = self._tracker.notify_queue_next() while notify is not None: completed_name, update_status = notify + + # If the model is not in the context, we can skip any notification + # This will happen for external models if not sqlmesh_context.get_model(completed_name): notify = self._tracker.notify_queue_next() continue - model = self._models_map[completed_name] - output_key = sqlmesh_model_name_to_key(model.name) - asset_key = self._context.asset_key_for_output(output_key) - # asset_key = translator.get_asset_key_from_model( - # controller.context, model - # ) - yield MaterializeResult( - asset_key=asset_key, - metadata={ - "updated": update_status, - "duration_ms": 0, - }, - ) + + model = self._models_map.get(completed_name) + + # We allow selecting models. That value is mapped to models_map. + # If the model is not in models_map, we can skip any notification + if model: + output_key = sqlmesh_model_name_to_key(model.name) + asset_key = self._context.asset_key_for_output(output_key) + yield MaterializeResult( + asset_key=asset_key, + metadata={ + "updated": update_status, + "duration_ms": 0, + }, + ) notify = self._tracker.notify_queue_next() def report_event(self, event: console.ConsoleEvent): @@ -189,14 +194,14 @@ def report_event(self, event: console.ConsoleEvent): raise Exception("sqlmesh failed during run") case console.LogError(message): log_context.error( - message, + f"sqlmesh reported an error: {message}", ) case console.LogFailedModels(models): - log_context.error( - "\n".join( + if len(models) != 0: + failed_models = "\n".join( [f"{str(model)}\n{str(model.__cause__)}" for model in models] - ), - ) + ) + log_context.error(f"sqlmesh failed models: {failed_models}") case _: log_context.debug("Received event") @@ -244,6 +249,8 @@ def run( dag = mesh.models_dag() plan_options["select_models"] = [] + plan_options["backfill_models"] = [] + run_options["select_models"] = [] models = mesh.models() models_map = models.copy() @@ -254,7 +261,12 @@ def run( sqlmesh_model_name_to_key(model.name) in context.selected_output_names ): + logger.info(f"selected model: {model.name}") + models_map[key] = model + plan_options["select_models"].append(model.name) + plan_options["backfill_models"].append(model.name) + run_options["select_models"].append(model.name) event_handler = DagsterSQLMeshEventHandler( context, models_map, dag, "sqlmesh: " diff --git a/sample/dagster_project/definitions.py b/sample/dagster_project/definitions.py index da72d4d..c6bc2b9 100644 --- a/sample/dagster_project/definitions.py +++ b/sample/dagster_project/definitions.py @@ -51,7 +51,7 @@ def test_source() -> pl.DataFrame: ) -@sqlmesh_assets(environment="dev", config=sqlmesh_config) +@sqlmesh_assets(environment="dev", config=sqlmesh_config, enabled_subsetting=True) def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource): yield from sqlmesh.run(context)