Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sqlmesh/core/model/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ def parse_expression(

if isinstance(v, list):
return [
d.parse_one(e, dialect=dialect) if not isinstance(e, exp.Expression) else e for e in v
e if isinstance(e, exp.Expression) else d.parse_one(e, dialect=dialect)
for e in v
if not isinstance(e, exp.Semicolon)
]

if isinstance(v, str):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
This script's goal is to warn users if there are two adjacent expressions in a SQL
model that are equivalent.

Context:

We used to include `Semicolon` expressions in the model's state, which led to a bug
where the expression preceding the semicolon would be duplicated in pre_statements
or post_statements. For example, the query in the model below would be incorrectly
included in its post_statements list:

```
MODEL (
name test
);

SELECT 1 AS c;

-- foo
```

We now don't include `Semicolon` expressions in the model's state, which fixes this
issue, but unfortunately migrating existing snapshots is not possible because we do
not have a signal in state to detect whether an expression was incorrectly duplicated.

If a SQL model suffered from this issue, then there would be two adjacent equivalent
expressions in it, so we use that as a heuristic to warn the user accordingly.
"""

import json

from sqlglot import exp

from sqlmesh.core.console import get_console


def migrate(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"

warning = (
"SQLMesh detected that it may not be able to fully migrate the state database. This should not impact "
"the migration process, but may result in unexpected changes being reported by the next `sqlmesh plan` "
"command. Please run `sqlmesh diff prod` after the migration has completed, before making any new "
"changes. If any unexpected changes are reported, consider running a forward-only plan to apply these "
"changes and avoid unnecessary backfills: sqlmesh plan prod --forward-only. "
"See https://sqlmesh.readthedocs.io/en/stable/concepts/plans/#forward-only-plans for more details.\n"
)

for (snapshot,) in engine_adapter.fetchall(
exp.select("snapshot").from_(snapshots_table), quote_identifiers=True
):
parsed_snapshot = json.loads(snapshot)
node = parsed_snapshot["node"]

if node.get("source_type") == "sql":
expressions = [
*node.get("pre_statements", []),
node["query"],
*node.get("post_statements", []),
]
for e1, e2 in zip(expressions, expressions[1:]):
if e1 == e2:
get_console().log_warning(warning)
return
93 changes: 93 additions & 0 deletions tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9332,6 +9332,7 @@ def test_python_env_references_are_unequal_but_point_to_same_definition(tmp_path

db_path = str(tmp_path / "db.db")
db_connection = DuckDBConnectionConfig(database=db_path)

config = Config(
gateways={"duckdb": GatewayConfig(connection=db_connection)},
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
Expand Down Expand Up @@ -9447,3 +9448,95 @@ def f():

with pytest.raises(SQLMeshError, match=r"duplicate definitions found"):
Context(paths=tmp_path, config=config)


def test_semicolon_is_not_included_in_model_state(tmp_path, assert_exp_eq):
init_example_project(tmp_path, dialect="duckdb", template=ProjectTemplate.EMPTY)

db_connection = DuckDBConnectionConfig(database=str(tmp_path / "db.db"))
config = Config(
gateways={"duckdb": GatewayConfig(connection=db_connection)},
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
)

model_file = tmp_path / "models" / "model_with_semicolon.sql"
model_file.write_text(
"""
MODEL (
name sqlmesh_example.incremental_model_with_semicolon,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_date
),
start '2020-01-01',
cron '@daily',
grain (id, event_date)
);

SELECT
1 AS id,
1 AS item_id,
CAST('2020-01-01' AS DATE) AS event_date
;

--Just a comment
"""
)

ctx = Context(paths=tmp_path, config=config)
model = ctx.get_model("sqlmesh_example.incremental_model_with_semicolon")

assert not model.pre_statements
assert not model.post_statements

assert_exp_eq(
model.render_query(),
'SELECT 1 AS "id", 1 AS "item_id", CAST(\'2020-01-01\' AS DATE) AS "event_date"',
)
ctx.format()

assert (
model_file.read_text()
== """MODEL (
name sqlmesh_example.incremental_model_with_semicolon,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_date
),
start '2020-01-01',
cron '@daily',
grain (id, event_date)
);

SELECT
1 AS id,
1 AS item_id,
'2020-01-01'::DATE AS event_date;

/* Just a comment */"""
)

ctx.plan(no_prompts=True, auto_apply=True)

model_file = tmp_path / "models" / "model_with_semicolon.sql"
model_file.write_text(
"""
MODEL (
name sqlmesh_example.incremental_model_with_semicolon,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_date
),
start '2020-01-01',
cron '@daily',
grain (id, event_date)
);

SELECT
1 AS id,
1 AS item_id,
CAST('2020-01-01' AS DATE) AS event_date
"""
)

ctx.load()
plan = ctx.plan(no_prompts=True, auto_apply=True)

assert not plan.context_diff.modified_snapshots