diff --git a/docs/prerequisites.md b/docs/prerequisites.md index 11acfca64e..540a6b4885 100644 --- a/docs/prerequisites.md +++ b/docs/prerequisites.md @@ -4,7 +4,7 @@ This page describes the system prerequisites needed to run SQLMesh and provides ## SQLMesh prerequisites -You'll need Python 3.8 or higher to use SQLMesh. You can check your python version by running the following command: +You'll need Python 3.9 or higher to use SQLMesh. You can check your python version by running the following command: ```bash python3 --version ``` diff --git a/pyproject.toml b/pyproject.toml index ed6532624d..9f2294d4bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,6 @@ authors = [{ name = "SQLMesh Contributors" }] license = { file = "LICENSE" } requires-python = ">= 3.9" dependencies = [ - "astor", "click", "croniter", "duckdb>=0.10.0,!=0.10.3", @@ -202,7 +201,6 @@ disable_error_code = "annotation-unchecked" [[tool.mypy.overrides]] module = [ "api.*", - "astor.*", "IPython.*", "hyperscript.*", "py.*", diff --git a/sqlmesh/core/model/common.py b/sqlmesh/core/model/common.py index c75531afb8..f03cf49753 100644 --- a/sqlmesh/core/model/common.py +++ b/sqlmesh/core/model/common.py @@ -4,7 +4,6 @@ import typing as t from pathlib import Path -from astor import to_source from difflib import get_close_matches from sqlglot import exp from sqlglot.helper import ensure_list @@ -387,7 +386,7 @@ def get_first_arg(keyword_arg_name: str) -> t.Any: ) try: - expression = to_source(first_arg) + expression = ast.unparse(t.cast(ast.expr, first_arg)) return eval(expression, env, local_env) except Exception: if strict_resolution: diff --git a/sqlmesh/migrations/v0102_normalize_python_env_payloads.py b/sqlmesh/migrations/v0102_normalize_python_env_payloads.py new file mode 100644 index 0000000000..12f7da86b4 --- /dev/null +++ b/sqlmesh/migrations/v0102_normalize_python_env_payloads.py @@ -0,0 +1,123 @@ +"""Re-normalize python_env payloads using ast.unparse after dropping astor. + +SQLMesh previously used the third-party `astor` library to serialise Python +function source code (`normalize_source`). That library has been replaced with +the stdlib `ast.unparse`, which produces subtly different text for the same +AST (e.g. `lambda : x` → `lambda: x`, condensed multi-line signatures, etc.). + +Because `python_env` payloads are included in each snapshot's `data_hash`, +any model that contains Python code (Python models, SQL models with Python +macros/signals) would otherwise appear as *Directly Modified* after the upgrade, +potentially triggering a full backfill. + +This migration re-normalises every stored `Executable` payload of +`kind == "definition"` via `ast.unparse(ast.parse(payload))`. The +subsequent `_migrate_rows` pass then recomputes fingerprints from the updated +payloads so that they match what the current code produces when loading models +from disk. The migrated snapshots are flagged `migrated = True`, so no +unexpected backfills are scheduled. +""" + +import ast +import json + +from sqlglot import exp + +from sqlmesh.utils.migration import index_text_type, blob_text_type + + +def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore + pass + + +def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore + import pandas as pd + + snapshots_table = "_snapshots" + if schema: + snapshots_table = f"{schema}.{snapshots_table}" + + index_type = index_text_type(engine_adapter.dialect) + blob_type = blob_text_type(engine_adapter.dialect) + + new_snapshots = [] + migration_needed = False + + for ( + name, + identifier, + version, + snapshot, + kind_name, + updated_ts, + unpaused_ts, + ttl_ms, + unrestorable, + forward_only, + dev_version, + fingerprint, + ) in engine_adapter.fetchall( + exp.select( + "name", + "identifier", + "version", + "snapshot", + "kind_name", + "updated_ts", + "unpaused_ts", + "ttl_ms", + "unrestorable", + "forward_only", + "dev_version", + "fingerprint", + ).from_(snapshots_table), + quote_identifiers=True, + ): + parsed_snapshot = json.loads(snapshot) + python_env = parsed_snapshot["node"].get("python_env") or {} + for executable in python_env.values(): + if executable.get("kind") != "definition": + continue + new_payload = ast.unparse(ast.parse(executable["payload"])).strip() + if new_payload != executable["payload"]: + executable["payload"] = new_payload + migration_needed = True + + new_snapshots.append( + { + "name": name, + "identifier": identifier, + "version": version, + "snapshot": json.dumps(parsed_snapshot), + "kind_name": kind_name, + "updated_ts": updated_ts, + "unpaused_ts": unpaused_ts, + "ttl_ms": ttl_ms, + "unrestorable": unrestorable, + "forward_only": forward_only, + "dev_version": dev_version, + "fingerprint": fingerprint, + } + ) + + if migration_needed and new_snapshots: + engine_adapter.delete_from(snapshots_table, "TRUE") + + engine_adapter.insert_append( + snapshots_table, + pd.DataFrame(new_snapshots), + target_columns_to_types={ + "name": exp.DataType.build(index_type), + "identifier": exp.DataType.build(index_type), + "version": exp.DataType.build(index_type), + "snapshot": exp.DataType.build(blob_type), + "kind_name": exp.DataType.build(index_type), + "updated_ts": exp.DataType.build("bigint"), + "unpaused_ts": exp.DataType.build("bigint"), + "ttl_ms": exp.DataType.build("bigint"), + "unrestorable": exp.DataType.build("boolean"), + "forward_only": exp.DataType.build("boolean"), + "dev_version": exp.DataType.build(index_type), + "fingerprint": exp.DataType.build(blob_type), + }, + ) diff --git a/sqlmesh/utils/metaprogramming.py b/sqlmesh/utils/metaprogramming.py index cd77c36353..a5bd376566 100644 --- a/sqlmesh/utils/metaprogramming.py +++ b/sqlmesh/utils/metaprogramming.py @@ -17,8 +17,6 @@ from numbers import Number from pathlib import Path -from astor import to_source - from sqlmesh.core import constants as c from sqlmesh.utils import format_exception, unique from sqlmesh.utils.errors import SQLMeshError @@ -267,14 +265,19 @@ def normalize_source(obj: t.Any) -> str: # remove docstrings body = node.body - if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str): + if ( + body + and isinstance(body[0], ast.Expr) + and isinstance(body[0].value, ast.Constant) + and isinstance(body[0].value.value, str) + ): node.body = body[1:] # remove function return type annotation if isinstance(node, ast.FunctionDef): node.returns = None - return to_source(root_node).strip() + return ast.unparse(root_node).strip() def build_env( diff --git a/tests/utils/test_metaprogramming.py b/tests/utils/test_metaprogramming.py index 9a6f0c95cd..1f1431b963 100644 --- a/tests/utils/test_metaprogramming.py +++ b/tests/utils/test_metaprogramming.py @@ -1,3 +1,4 @@ +import ast import typing as t from contextlib import contextmanager from dataclasses import dataclass @@ -50,7 +51,7 @@ def test_print_exception(mocker: MockerFixture): except Exception as ex: print_exception(ex, test_env, out_mock) - expected_message = r""" File ".*?.tests.utils.test_metaprogramming\.py", line 49, in test_print_exception + expected_message = r""" File ".*?.tests.utils.test_metaprogramming\.py", line 50, in test_print_exception eval\("test_fun\(\)", env\).* File '/test/path.py' \(or imported file\), line 2, in test_fun @@ -220,8 +221,7 @@ def closure() -> int: def test_normalize_source() -> None: assert ( normalize_source(main_func) - == """def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2 - ): + == """def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2): sqlglot.parse_one('1') MyClass(47) DataClass(x=y) @@ -271,8 +271,7 @@ def test_serialize_env() -> None: name="main_func", alias="MAIN", path="test_metaprogramming.py", - payload="""def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2 - ): + payload="""def main_func(y: int, foo=exp.true(), *, bar=expressions.Literal.number(1) + 2): sqlglot.parse_one('1') MyClass(47) DataClass(x=y) @@ -370,7 +369,8 @@ def sample_context_manager(): "my_lambda": Executable( name="my_lambda", path="test_metaprogramming.py", - payload="my_lambda = lambda : print('z')", + # Match normalize_source output across Python versions + payload=ast.unparse(ast.parse("my_lambda = lambda: print('z')")).strip(), ), "normalize_model_name": Executable( payload="from sqlmesh.core.dialect import normalize_model_name",