From ffab5148b4fc80ffae8d45eb5c3d0b2b3a5c1298 Mon Sep 17 00:00:00 2001 From: George Sittas Date: Sat, 26 Apr 2025 01:45:30 +0300 Subject: [PATCH 1/2] Fix!: exclude Semicolon expressions from model state --- sqlmesh/core/model/common.py | 4 +- tests/core/test_model.py | 93 ++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/model/common.py b/sqlmesh/core/model/common.py index 9e0aed8783..61d331f306 100644 --- a/sqlmesh/core/model/common.py +++ b/sqlmesh/core/model/common.py @@ -262,7 +262,9 @@ def parse_expression( if isinstance(v, list): return [ - d.parse_one(e, dialect=dialect) if not isinstance(e, exp.Expression) else e for e in v + e if isinstance(e, exp.Expression) else d.parse_one(e, dialect=dialect) + for e in v + if not isinstance(e, exp.Semicolon) ] if isinstance(v, str): diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 2a72903f52..3d9f3a81cf 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -9332,6 +9332,7 @@ def test_python_env_references_are_unequal_but_point_to_same_definition(tmp_path db_path = str(tmp_path / "db.db") db_connection = DuckDBConnectionConfig(database=db_path) + config = Config( gateways={"duckdb": GatewayConfig(connection=db_connection)}, model_defaults=ModelDefaultsConfig(dialect="duckdb"), @@ -9447,3 +9448,95 @@ def f(): with pytest.raises(SQLMeshError, match=r"duplicate definitions found"): Context(paths=tmp_path, config=config) + + +def test_semicolon_is_not_included_in_model_state(tmp_path, assert_exp_eq): + init_example_project(tmp_path, dialect="duckdb", template=ProjectTemplate.EMPTY) + + db_connection = DuckDBConnectionConfig(database=str(tmp_path / "db.db")) + config = Config( + gateways={"duckdb": GatewayConfig(connection=db_connection)}, + model_defaults=ModelDefaultsConfig(dialect="duckdb"), + ) + + model_file = tmp_path / "models" / "model_with_semicolon.sql" + model_file.write_text( + """ + MODEL ( + name sqlmesh_example.incremental_model_with_semicolon, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date) + ); + + SELECT + 1 AS id, + 1 AS item_id, + CAST('2020-01-01' AS DATE) AS event_date + ; + + --Just a comment + """ + ) + + ctx = Context(paths=tmp_path, config=config) + model = ctx.get_model("sqlmesh_example.incremental_model_with_semicolon") + + assert not model.pre_statements + assert not model.post_statements + + assert_exp_eq( + model.render_query(), + 'SELECT 1 AS "id", 1 AS "item_id", CAST(\'2020-01-01\' AS DATE) AS "event_date"', + ) + ctx.format() + + assert ( + model_file.read_text() + == """MODEL ( + name sqlmesh_example.incremental_model_with_semicolon, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date) +); + +SELECT + 1 AS id, + 1 AS item_id, + '2020-01-01'::DATE AS event_date; + +/* Just a comment */""" + ) + + ctx.plan(no_prompts=True, auto_apply=True) + + model_file = tmp_path / "models" / "model_with_semicolon.sql" + model_file.write_text( + """ + MODEL ( + name sqlmesh_example.incremental_model_with_semicolon, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date) + ); + + SELECT + 1 AS id, + 1 AS item_id, + CAST('2020-01-01' AS DATE) AS event_date + """ + ) + + ctx.load() + plan = ctx.plan(no_prompts=True, auto_apply=True) + + assert not plan.context_diff.modified_snapshots From 4ba0256049ab47ac294551688d15acc3f46aab10 Mon Sep 17 00:00:00 2001 From: George Sittas Date: Tue, 6 May 2025 00:25:09 +0300 Subject: [PATCH 2/2] Add migration script to warn --- ...rn_if_incorrectly_duplicated_statements.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py diff --git a/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py b/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py new file mode 100644 index 0000000000..7fb9affb1d --- /dev/null +++ b/sqlmesh/migrations/v0082_warn_if_incorrectly_duplicated_statements.py @@ -0,0 +1,68 @@ +""" +This script's goal is to warn users if there are two adjacent expressions in a SQL +model that are equivalent. + +Context: + +We used to include `Semicolon` expressions in the model's state, which led to a bug +where the expression preceding the semicolon would be duplicated in pre_statements +or post_statements. For example, the query in the model below would be incorrectly +included in its post_statements list: + +``` +MODEL ( + name test +); + +SELECT 1 AS c; + +-- foo +``` + +We now don't include `Semicolon` expressions in the model's state, which fixes this +issue, but unfortunately migrating existing snapshots is not possible because we do +not have a signal in state to detect whether an expression was incorrectly duplicated. + +If a SQL model suffered from this issue, then there would be two adjacent equivalent +expressions in it, so we use that as a heuristic to warn the user accordingly. +""" + +import json + +from sqlglot import exp + +from sqlmesh.core.console import get_console + + +def migrate(state_sync, **kwargs): # type: ignore + engine_adapter = state_sync.engine_adapter + schema = state_sync.schema + snapshots_table = "_snapshots" + if schema: + snapshots_table = f"{schema}.{snapshots_table}" + + warning = ( + "SQLMesh detected that it may not be able to fully migrate the state database. This should not impact " + "the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` " + "command. Please run `sqlmesh diff prod` after the migration has completed, before making any new " + "changes. If any unexpected changes are reported, consider running a forward-only plan to apply these " + "changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. " + "See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n" + ) + + for (snapshot,) in engine_adapter.fetchall( + exp.select("snapshot").from_(snapshots_table), quote_identifiers=True + ): + parsed_snapshot = json.loads(snapshot) + node = parsed_snapshot["node"] + + if node.get("source_type") == "sql": + expressions = [ + *node.get("pre_statements", []), + node["query"], + *node.get("post_statements", []), + ] + for e1, e2 in zip(expressions, expressions[1:]): + if e1 == e2: + get_console().log_warning(warning) + return