Accessing the schema of an upstream model in a sql-model-defined-with-python
Accessing the schema of an upstream model doesn't quite work for SQL-models-defined-with-python. Here is a code repro.
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from sqlmesh.core.config import (
Config,
GatewayConfig,
ModelDefaultsConfig,
DuckDBConnectionConfig,
)
from sqlmesh.core.context import Context
folder = TemporaryDirectory()
tmp_path = Path(folder.name)
print("TemporaryDirectory: ", tmp_path)
first_model_definition = """
MODEL (
name FEATURES.FIRST,
kind VIEW,
grain ID,
);
SELECT
1 AS ID, /* Customer ID */
'[email protected]' AS EMAIL, /* Email address of the customer */
"""
second_model_definition = """
from sqlglot import exp
from sqlmesh.core.macros import MacroEvaluator
from sqlmesh.core.model import model
from sqlmesh.core.model.kind import ModelKindName
PARENT_MODELS = ["FEATURES.FIRST"]
@model(
"FEATURES.SECOND",
is_sql=True,
kind={"name": ModelKindName.FULL},
depends_on=PARENT_MODELS,
)
def entrypoint(evaluator: MacroEvaluator) -> exp.Expression:
dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
print(dict_types)
if dict_types["id"].Type == exp.DataType.Type.TEXT:
return exp.select("'1'::TEXT as col")
else:
return exp.select("1 as col")
"""
os.makedirs(tmp_path / "models", exist_ok=True)
first_model_path = tmp_path / "models" / "first.sql"
first_model_path.write_text(first_model_definition)
second_model_path = tmp_path / "models" / "second.py"
second_model_path.write_text(second_model_definition)
db_path = str(tmp_path / "repro_db.db")
config = Config(
gateways={
"main": GatewayConfig(connection=DuckDBConnectionConfig(database=db_path))
},
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
)
context = Context(paths=tmp_path, config=config)
context.plan(auto_apply=True, no_prompts=True)
And the error traceback:
Traceback (most recent call last):
File "/home/ubuntu/repos/sqlmesh-repro/.venv/lib/python3.12/site-packages/sqlmesh/core/macros.py", line 219, in send
return func(*bound.args, **bound.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File 'models/second.py' (or imported file), line 4, in entrypoint
def entrypoint(evaluator: MacroEvaluator):
dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
print(dict_types)
if dict_types['id'].Type == exp.DataType.Type.TEXT:
KeyError: 'id'
I traced the issue to this method call when SQLMesh loads models - https://github.com/TobikoData/sqlmesh/blob/e27daf54c19ba1c3d0fe78a86f5b928b3795630e/sqlmesh/core/model/definition.py#L840
Given the SQL-python model specifies its dependency using the "depends_on" arg, sqlmesh doesn't need to render the query to fetch the dependencies again (which is causing the issue, since the first model's schema hasn't been added to the registry yet, and rendering the python model needs that info).
This looks a bit intractable. Looking at all the tests that go through the above code path (derive model dependencies by rendering the query when the depends_on arg is already user specified), they have the SQL query model dependent on
- another model not listed in the
depends_onargument - or, dependent on itself
tests/core/test_model.py::test_user_provided_depends_on
tests/core/test_model.py::test_model_normalization
tests/core/test_plan.py::test_auto_categorization_missing_schema_downstream
tests/core/test_plan.py::test_add_restatements[Simple dependency with leaf depends on self]
tests/core/test_plan.py::test_add_restatements[ a -> c -> d b -> c -> e -> g b -> f -> g c depends on self restate a and b ]
tests/core/test_plan.py::test_add_restatements[ a -> c -> d b -> c -> e -> g b -> f -> g c depends on self restate e ]
To resolve evaluator.columns_to_types(model_name), SQLmesh needs to figure out the DAG structure first so it can propagate column types - link.
However, that isn't doable until dependencies for each model file have already been computed. So, we have a circular error.
https://sqlmesh.readthedocs.io/en/stable/concepts/models/python_models/#dependencies
Documentation for python models says - specifying depends_on takes precedence over SQLmesh resolving the dependencies.
Is it okay to extend this property to SQL-models-defined-in-python? Other options dont feel great.
- introduce another parameter like "dont_recompute_deps if_defined".
- raise a
ParsetimeAdapterCallErrorifcolumns_to_typesis called before the macro evaluator has schemas populated, and bubble it up. Link
Documentation for python models says - specifying depends_on takes precedence over SQLmesh resolving the dependencies.
Yeah, I think this documentation is out of date.
I think there's a bit of a chicken and egg problem.
@georgesittas, I think it's fundamentally wrong to access evaluator in the Python SQL model entrypoint. Instead the model should return an expression which contains a macro call, which in turn uses evaluator. It doesn't suppose to evaluate macros in place. IMHO, the evaluator should not be used to construct the expression. It should only be used to render it.
@ananis25, I suggest creating a macro like:
@macro()
def my_macro(evaluator, col_name):
dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
if dict_types[col_name.name].Type == exp.DataType.Type.TEXT:
return exp.cast("'1'", exp.DataType.build("text"))`
else:
return exp.parse_one("'1'")
And then call it in the model's SQL:
@model(
"FEATURES.SECOND",
is_sql=True,
kind={"name": ModelKindName.FULL},
depends_on=PARENT_MODELS,
)
def entrypoint(evaluator: MacroEvaluator) -> exp.Expression:
return exp.select("@my_macro('id') AS col")
@georgesittas, @tobymao, I suggest removing the evaluator reference from the Python SQL model interface. I believe it's wrong.
After additional deliberation it turned out that I was wrong about my initial conclusion.
@ananis25, your initial implementation should work with one caveat: the columns_to_types will not be available when the query rendering happens at project load time (as opposed to run time).
In this case, the columns_to_types dictionary will contain a special placeholder key __schema_unavailable_at_load__ indicating the schema's unavailability. You need to add logic to account for this placeholder as part of the model/macro implementation.
Apologies for providing misleading guidance previously.
@georgesittas, let's use this issue as an opportunity to document this behavior to prevent confusion in the future.
Thanks. Toby suggested the same thing but the schema of my actual model really depends on the column types/comments. Returning a dummy query earlier (SELECT 1 or None), and the actual query later, raised sqlmesh errors elsewhere.
I got around it by
-
Prefixing all model files by their depth in the full DAG -
1_model.sql,1_model2.py,2_model3.sql, and so on - this lets us infer the DAG order statically. -
Creating a context object that only includes the dependencies of the current model. And leveraging the sqlmesh methods on it.
(Edited: cleaner impl)
def fetch_column_types_and_comments(
source_name: str, parent_model_names: list[str], self_file_path: str
) -> list["ModelData"]:
"""Fetch the column types and comments for SQLMesh models upstream of the caller. Used
to create dynamic models.
Args:
source_name: The name of the source.
model_names: The names of the models, we want to fetch the schema for.
self_dag_level: Skips files at the specified level and downstream, when
constructing the context to fetch the model schema. Critical, so we don't
end in a recursive loop. Link - https://github.com/TobikoData/sqlmesh/issues/2900
Returns:
A list of (dict-types, dict-comments).
"""
self_dag_level = int(os.path.basename(self_file_path).split("_", maxsplit=1)[0])
skip_patterns = [f"{n}_*" for n in range(self_dag_level, MAX_DAG_DEPTH)]
fake_ctx = make_sqlmesh_context(source_name, make_dummy_config(skip_patterns))
list_model_info = []
for model_name in parent_model_names:
model = fake_ctx.get_model(model_name)
assert model is not None
assert model.columns_to_types is not None
types = {
k: v.sql(dialect=fake_ctx.default_dialect)
for k, v in model.columns_to_types.items()
}
descr = model.column_descriptions
list_model_info.append(
ModelData(
name=model_name,
columns_to_sql_types=dict(sorted(types.items(), key=lambda x: x[0])),
columns_to_comments=dict(sorted(descr.items(), key=lambda x: x[0])),
)
)
return list_model_info
I've had some time to look at this now and the suggestion does work. You just need to check that the placeholder is in the dict, return a dummy query. Your example worked with this check.
from sqlglot import exp
from sqlmesh.core.macros import MacroEvaluator
from sqlmesh.core.model import model
from sqlmesh.core.model.kind import ModelKindName
PARENT_MODELS = ["FEATURES.FIRST"]
@model(
"FEATURES.SECOND",
is_sql=True,
kind={"name": ModelKindName.FULL},
depends_on=PARENT_MODELS,
)
def entrypoint(evaluator: MacroEvaluator) -> exp.Expression:
dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
if "__schema_unavailable_at_load__" in dict_types:
return exp.select("1")
return exp.select("'1'::int as ID")