Pickling error for custom model kind
SQLMesh Version: 0.174.2 (but I believe the problem still persists) Dialect: Trino State: Postgres
I created a custom materialization to override the default use of MERGE for INCREMENTAL_BY_UNIQUE_KEY in Trino for performance reasons. I did want to preserve the sequential behaviour though so created a custom model kind for this.
The custom classes live in the same project as the models. I've had the error in both unit testing and when running in Airflow. When running the process using sqlmesh run from local the error is not present.
../.venv/lib/python3.12/site-packages/sqlmesh/core/context.py:444: in __init__
self.load()
../.venv/lib/python3.12/site-packages/sqlmesh/core/context.py:589: in load
loaded_projects = [loader.load() for loader in self._loaders]
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:126: in load
models = self._load_models(
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:428: in _load_models
sql_models = self._load_sql_models(macros, jinja_macros, audits, signals, cache)
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:489: in _load_sql_models
for model in cache.get_or_load_models(path, _load):
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:760: in get_or_load_models
models = self._model_cache.get_or_load(
../.venv/lib/python3.12/site-packages/sqlmesh/core/model/cache.py:70: in get_or_load
self._file_cache.put(name, entry_id, value=models)
def put(self, name: str, entry_id: str = "", *, value: T) -> None:
"""Stores the given value in the cache.
Args:
name: The name of the entry.
entry_id: The unique entry identifier. Used for cache invalidation.
value: The value to store in the cache.
"""
self._path.mkdir(parents=True, exist_ok=True)
if not self._path.is_dir():
raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
> pickle.dump(value, fd)
E _pickle.PicklingError: Can't pickle <class 'materializations.delsert.DelSertKind'>: it's not the same object as materializations.delsert.DelSertKind
Apparently pickle enforces object identity and because the id of the class in the cache is different to that of the instantiated object, it's causing this error.
If I remove the custom model kind, the error disappears. I have also tried setting MAX_FORK_WORKERS=1 but this didn't help.
There is a workaround I can employ already, which is to import the custom materialization cache and clear it each time. I've only got one use of it so far, so this shouldn't be a big deal.
def run_model(model_name: str) -> None:
"""Will run a specific model."""
from sqlmesh.core.snapshot import evaluator
evaluator._custom_materialization_type_cache.clear() # Temporary workaround
sqlmesh_context = Context(paths=SQLMESH_PROJECT_PATH)
completion_status: CompletionStatus = sqlmesh_context.run(
skip_janitor=True,
select_models=[model_name],
)
if completion_status.is_failure:
raise AirflowException
Can you please share the implementation of your materialization?
Of course, this is a direct copy/paste of what has been written, warts and all.
"""The motivation behind this custom materialisation is because MERGE was extremely slow for us in Trino.
So slow that for our largest table, it was not viable estimated to take 63 days to backfill
requiring a large cluster to do so as it holds a lot of data in memory whilst being in a bottleneck.
https://trino.io/docs/current/develop/supporting-merge.html#merge-redistribution
`To ensure that all inserted rows for a given partition end up on a single node,
the redistribution hash on the partition key/bucket columns is applied to the page partition keys.
As a result of the hash, all rows for a specific partition/bucket hash together,
whether they were MATCHED rows or NOT MATCHED rows.`
The intention is to move from MERGE to DELETE then INSERT.
To make this as performant as possible we're going to allow SQLMesh to insert data for as long as possible before then
switching to DELETE and INSERT.
We must ensure that the batches are run sequentially as we cannot allow older data to overwrite newer data if running
in parallel, they must be sequentially applied in chronological order.
Similarly, we must guarantee that Iceberg is not left in a partially updated state so the delete and then insert must
be run as part of a single transaction.
"""
import typing as t
import sqlglot.expressions as exp
from sqlmesh import CustomMaterialization
from sqlmesh.core.engine_adapter.base import EngineAdapter
from sqlmesh.core.model.kind import CustomKind
from sqlmesh.utils.date import TimeLike, make_inclusive, to_datetime, to_time_column
if t.TYPE_CHECKING:
from sqlmesh import Model, QueryOrDF
class DelSertKind(CustomKind):
batch_concurrency: t.Literal[1] = 1 # Like Unique Key, run 1 interval at a time
def is_incremental(self) -> bool:
return True
def delsert( # noqa: D103
table_name: str, # ": str" is optional argument typing
query_or_df: "QueryOrDF",
model: "Model",
adapter: EngineAdapter,
**kwargs: t.Any, # noqa: ANN401
) -> None:
start: TimeLike = kwargs["start"]
end: TimeLike = kwargs["end"]
insert_until_time = model.custom_materialization_properties["insert_until_time"]
time_column = model.custom_materialization_properties["time_column"]
source_queries, columns_to_types = adapter._get_source_queries_and_columns_to_types( # noqa: SLF001
query_or_df, model.columns_to_types, target_table=table_name
)
time_column_type = columns_to_types[time_column.name]
# If we have 100% confidence that there are no duplicates before a certain point then we need to just
# ram them in as fast as possible doing inserts
if to_datetime(end) <= to_datetime(insert_until_time):
low, high = [
to_time_column(dt, time_column_type, adapter.dialect) for dt in make_inclusive(start, end, adapter.dialect)
]
where = exp.Between(
this=exp.to_column(time_column),
low=low,
high=high,
)
adapter._insert_overwrite_by_condition(table_name, source_queries, columns_to_types, where, **kwargs) # noqa: SLF001
else:
unique_key: exp.Tuple | exp.Column = model.custom_materialization_properties["unique_key"]
for source_query in source_queries:
with source_query as query, adapter.transaction():
temp_table = adapter._get_temp_table(table_name) # noqa: SLF001
adapter.ctas(temp_table, query, columns_to_types=columns_to_types)
if isinstance(unique_key, exp.Tuple):
unique_key_select = exp.convert(unique_key.expressions, True) # noqa: FBT003
else:
unique_key_select = [unique_key]
try:
delete_where = exp.In(
this=unique_key, query=exp.Subquery(this=exp.select(*unique_key_select).from_(temp_table))
)
# Due to an issue with Iceberg deletes in Trino 468 we need to set these session properties.
# otherwise we will get a "file already exists" error from s3
task_max_writer_count = "task_max_writer_count"
task_concurrency = "task_concurrency"
adapter.execute(
[
f"SET SESSION {task_max_writer_count} = 1",
f"SET SESSION {task_concurrency} = 1",
exp.delete(table_name, delete_where),
]
)
adapter.execute(
[
f"RESET SESSION {task_max_writer_count}", # sqlglot could not parse these
f"RESET SESSION {task_concurrency}", # sqlglot could not parse these
]
)
adapter.execute(exp.insert(exp.select("*").from_(temp_table), into=table_name))
finally:
adapter.drop_table(temp_table)
class DelSert(CustomMaterialization[DelSertKind]):
"""Will delete and insert data in to a model.
Expects the following inputs:
insert_until_time
time_column
unique_key
Before the insert_until_time this model will just insert data like an Incremental By Time Range model kind.
Afterward it does the following:
1. Create temporary table with changes in it.
2. Delete records in target table where primary keys are found.
3. Insert records in target table from temporary table.
4. Drop temporary table.
It has the same constraints as an Incremental By Unique Key model kind in that only one batch can be modelled at a
time.
"""
NAME = "delsert"
def insert( # noqa: D102
self,
table_name: str,
query_or_df: "QueryOrDF",
model: "Model",
is_first_insert: bool, # noqa: FBT001
**kwargs: t.Any, # noqa: ANN401
) -> None:
if is_first_insert:
self._replace_query_for_model(model, table_name, query_or_df, **kwargs)
else:
delsert(table_name, query_or_df, model, self.adapter, **kwargs)
def append( # noqa: D102
self,
table_name: str,
query_or_df: "QueryOrDF",
model: "Model",
**kwargs: t.Any, # noqa: ANN401
) -> None:
delsert(table_name, query_or_df, model, self.adapter, **kwargs)