sqlmesh icon indicating copy to clipboard operation
sqlmesh copied to clipboard

Accessing the schema of an upstream model in a sql-model-defined-with-python

Open ananis25 opened this issue 1 year ago • 6 comments

Accessing the schema of an upstream model doesn't quite work for SQL-models-defined-with-python. Here is a code repro.

import os
from pathlib import Path
from tempfile import TemporaryDirectory

from sqlmesh.core.config import (
    Config,
    GatewayConfig,
    ModelDefaultsConfig,
    DuckDBConnectionConfig,
)
from sqlmesh.core.context import Context

folder = TemporaryDirectory()
tmp_path = Path(folder.name)
print("TemporaryDirectory: ", tmp_path)

first_model_definition = """
MODEL (
    name FEATURES.FIRST,
    kind VIEW,
    grain ID,
);

SELECT
    1 AS ID, /* Customer ID */
    '[email protected]' AS EMAIL, /* Email address of the customer */
"""

second_model_definition = """
from sqlglot import exp
from sqlmesh.core.macros import MacroEvaluator
from sqlmesh.core.model import model
from sqlmesh.core.model.kind import ModelKindName

PARENT_MODELS = ["FEATURES.FIRST"]

@model(
    "FEATURES.SECOND",
    is_sql=True,
    kind={"name": ModelKindName.FULL},
    depends_on=PARENT_MODELS,
)
def entrypoint(evaluator: MacroEvaluator) -> exp.Expression:
    dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
    print(dict_types)
    if dict_types["id"].Type == exp.DataType.Type.TEXT:
        return exp.select("'1'::TEXT as col")
    else:
        return exp.select("1 as col")
"""

os.makedirs(tmp_path / "models", exist_ok=True)
first_model_path = tmp_path / "models" / "first.sql"
first_model_path.write_text(first_model_definition)
second_model_path = tmp_path / "models" / "second.py"
second_model_path.write_text(second_model_definition)

db_path = str(tmp_path / "repro_db.db")
config = Config(
    gateways={
        "main": GatewayConfig(connection=DuckDBConnectionConfig(database=db_path))
    },
    model_defaults=ModelDefaultsConfig(dialect="duckdb"),
)
context = Context(paths=tmp_path, config=config)
context.plan(auto_apply=True, no_prompts=True)

And the error traceback:

Traceback (most recent call last):

  File "/home/ubuntu/repos/sqlmesh-repro/.venv/lib/python3.12/site-packages/sqlmesh/core/macros.py", line 219, in send
    return func(*bound.args, **bound.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File 'models/second.py' (or imported file), line 4, in entrypoint
    def entrypoint(evaluator: MacroEvaluator):
        dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
        print(dict_types)
        if dict_types['id'].Type == exp.DataType.Type.TEXT:


KeyError: 'id'

ananis25 avatar Jul 13 '24 05:07 ananis25

I traced the issue to this method call when SQLMesh loads models - https://github.com/TobikoData/sqlmesh/blob/e27daf54c19ba1c3d0fe78a86f5b928b3795630e/sqlmesh/core/model/definition.py#L840

Given the SQL-python model specifies its dependency using the "depends_on" arg, sqlmesh doesn't need to render the query to fetch the dependencies again (which is causing the issue, since the first model's schema hasn't been added to the registry yet, and rendering the python model needs that info).

ananis25 avatar Jul 13 '24 06:07 ananis25

This looks a bit intractable. Looking at all the tests that go through the above code path (derive model dependencies by rendering the query when the depends_on arg is already user specified), they have the SQL query model dependent on

  • another model not listed in the depends_on argument
  • or, dependent on itself
tests/core/test_model.py::test_user_provided_depends_on 

tests/core/test_model.py::test_model_normalization 

tests/core/test_plan.py::test_auto_categorization_missing_schema_downstream 

tests/core/test_plan.py::test_add_restatements[Simple dependency with leaf depends on self] 

tests/core/test_plan.py::test_add_restatements[ a -> c -> d b -> c -> e -> g b -> f -> g c depends on self restate a and b ] 

tests/core/test_plan.py::test_add_restatements[ a -> c -> d b -> c -> e -> g b -> f -> g c depends on self restate e ]

To resolve evaluator.columns_to_types(model_name), SQLmesh needs to figure out the DAG structure first so it can propagate column types - link.

However, that isn't doable until dependencies for each model file have already been computed. So, we have a circular error.

ananis25 avatar Jul 14 '24 20:07 ananis25

https://sqlmesh.readthedocs.io/en/stable/concepts/models/python_models/#dependencies Documentation for python models says - specifying depends_on takes precedence over SQLmesh resolving the dependencies.

Is it okay to extend this property to SQL-models-defined-in-python? Other options dont feel great.

  • introduce another parameter like "dont_recompute_deps if_defined".
  • raise a ParsetimeAdapterCallError if columns_to_types is called before the macro evaluator has schemas populated, and bubble it up. Link

ananis25 avatar Jul 14 '24 20:07 ananis25

Documentation for python models says - specifying depends_on takes precedence over SQLmesh resolving the dependencies.

Yeah, I think this documentation is out of date.

I think there's a bit of a chicken and egg problem.

@georgesittas, I think it's fundamentally wrong to access evaluator in the Python SQL model entrypoint. Instead the model should return an expression which contains a macro call, which in turn uses evaluator. It doesn't suppose to evaluate macros in place. IMHO, the evaluator should not be used to construct the expression. It should only be used to render it.

@ananis25, I suggest creating a macro like:

@macro()
def my_macro(evaluator, col_name):
    dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
    if dict_types[col_name.name].Type == exp.DataType.Type.TEXT:
        return exp.cast("'1'", exp.DataType.build("text"))`
    else:
        return exp.parse_one("'1'")

And then call it in the model's SQL:

@model(
    "FEATURES.SECOND",
    is_sql=True,
    kind={"name": ModelKindName.FULL},
    depends_on=PARENT_MODELS,
)
def entrypoint(evaluator: MacroEvaluator) -> exp.Expression:
    return exp.select("@my_macro('id') AS col")

@georgesittas, @tobymao, I suggest removing the evaluator reference from the Python SQL model interface. I believe it's wrong.

izeigerman avatar Jul 15 '24 17:07 izeigerman

After additional deliberation it turned out that I was wrong about my initial conclusion.

@ananis25, your initial implementation should work with one caveat: the columns_to_types will not be available when the query rendering happens at project load time (as opposed to run time).

In this case, the columns_to_types dictionary will contain a special placeholder key __schema_unavailable_at_load__ indicating the schema's unavailability. You need to add logic to account for this placeholder as part of the model/macro implementation.

Apologies for providing misleading guidance previously.

@georgesittas, let's use this issue as an opportunity to document this behavior to prevent confusion in the future.

izeigerman avatar Jul 16 '24 20:07 izeigerman

Thanks. Toby suggested the same thing but the schema of my actual model really depends on the column types/comments. Returning a dummy query earlier (SELECT 1 or None), and the actual query later, raised sqlmesh errors elsewhere.

I got around it by

  • Prefixing all model files by their depth in the full DAG - 1_model.sql, 1_model2.py, 2_model3.sql, and so on - this lets us infer the DAG order statically.

  • Creating a context object that only includes the dependencies of the current model. And leveraging the sqlmesh methods on it.

(Edited: cleaner impl)

def fetch_column_types_and_comments(
    source_name: str, parent_model_names: list[str], self_file_path: str
) -> list["ModelData"]:
    """Fetch the column types and comments for SQLMesh models upstream of the caller. Used 
    to create dynamic models.

    Args:
        source_name: The name of the source.
        model_names: The names of the models, we want to fetch the schema for.
        self_dag_level: Skips files at the specified level and downstream, when
            constructing the context to fetch the model schema. Critical, so we don't
            end in a recursive loop. Link - https://github.com/TobikoData/sqlmesh/issues/2900

    Returns:
        A list of (dict-types, dict-comments).
    """

    self_dag_level = int(os.path.basename(self_file_path).split("_", maxsplit=1)[0])
    skip_patterns = [f"{n}_*" for n in range(self_dag_level, MAX_DAG_DEPTH)]
    fake_ctx = make_sqlmesh_context(source_name, make_dummy_config(skip_patterns))

    list_model_info = []
    for model_name in parent_model_names:
        model = fake_ctx.get_model(model_name)
        assert model is not None
        assert model.columns_to_types is not None

        types = {
            k: v.sql(dialect=fake_ctx.default_dialect)
            for k, v in model.columns_to_types.items()
        }
        descr = model.column_descriptions

        list_model_info.append(
            ModelData(
                name=model_name,
                columns_to_sql_types=dict(sorted(types.items(), key=lambda x: x[0])),
                columns_to_comments=dict(sorted(descr.items(), key=lambda x: x[0])),
            )
        )

    return list_model_info

ananis25 avatar Jul 17 '24 05:07 ananis25

I've had some time to look at this now and the suggestion does work. You just need to check that the placeholder is in the dict, return a dummy query. Your example worked with this check.

from sqlglot import exp
from sqlmesh.core.macros import MacroEvaluator
from sqlmesh.core.model import model
from sqlmesh.core.model.kind import ModelKindName

PARENT_MODELS = ["FEATURES.FIRST"]

@model(
    "FEATURES.SECOND",
    is_sql=True,
    kind={"name": ModelKindName.FULL},
    depends_on=PARENT_MODELS,
)
def entrypoint(evaluator: MacroEvaluator) -> exp.Expression:
    dict_types = evaluator.columns_to_types(PARENT_MODELS[0])
    if "__schema_unavailable_at_load__" in dict_types:
        return exp.select("1")
    return exp.select("'1'::int as ID")

tobymao avatar Aug 22 '24 18:08 tobymao