sqlmesh
sqlmesh copied to clipboard
Spark Engine using Glue catalog with iceberg on S3, does not support views, virtual data env
I am trying to use AWS Glue as data catalog to create iceberg tables, using spark as execution engine.
Here is config.yml
local:
connection:
type: spark
config:
spark.master: local[*]
spark.app.name: SQLMesh-GlueOnly
spark.driver.memory: 4g
spark.executor.memory: 4g
spark.jars: >
./jars/iceberg-spark-runtime-3.4_2.12-1.9.2.jar,
./jars/iceberg-aws-bundle-1.9.2.jar,
./jars/bundle-2.28.17.jar,
./jars/url-connection-client-2.28.17.jar,
./jars/hadoop-aws-3.3.4.jar,
./jars/aws-java-sdk-bundle-1.12.767.jar
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.glue_catalog: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue_catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
spark.sql.catalog.glue_catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.glue_catalog.warehouse: s3://dl-poc-helper-dev/sqlmesh/warehouse/
spark.sql.catalog.glue_catalog.glue.region: eu-west-1
spark.sql.defaultCatalog: glue_catalog
spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.aws.credentials.provider: >
org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider,
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider
spark.hadoop.fs.s3a.endpoint.region: eu-west-1
state_connection:
type: duckdb
database: ./sqlmesh_state.db
default_gateway: local
model_defaults:
dialect: spark
Simple model to test -
MODEL (
name glue_catalog.default.simple_sales,
kind FULL,
dialect spark
);
SELECT
1 as id,
'Product A' as product_name,
100.0 as amount,
date('2024-01-01') as sale_date,
current_timestamp() as created_at;
this throws error -
./scripts/setup_env.sh sqlmesh --debug plan dev ─╯
Getting AWS credentials from SSO profile...
✅ AWS credentials set
AWS_ACCESS_KEY_ID: .........
AWS_DEFAULT_REGION: eu-west-1
25/09/12 09:18:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/12 09:18:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[09:18:59] Test Results: <sqlmesh.core.test.result.ModelTextTestResult run=0 errors=0 failures=0> console.py:3815
The duckdb state connection is configured for single threaded mode but the warehouse connection is configured for multi threaded mode with 4 concurrent tasks. This can cause SQLMesh to hang. Overriding the duckdb state console.py:3815
connection config to use multi threaded mode.
The duckdb engine is not recommended for storing SQLMesh state in production deployments. Please see https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#state-connection for a list of recommended engines and console.py:3815
more information.
[09:19:00] Environment Difference Summary: console.py:3815
Model Difference Summary: console.py:3815
Models needing backfill:
└── default__dev.simple_sales: [full refresh]
Apply - Backfill Tables [y/n]: y
[09:19:08] Starting plan 9831bdc10ed64c468b2bb1d3e4dfd021 console.py:3815
[09:19:09] Starting evaluation for 1 snapshots console.py:3815
Evaluating "glue_catalog"."default"."simple_sales" console.py:3815
[09:19:10] Evaluated "glue_catalog"."default"."simple_sales" | batch=0 | duration=923ms | num_audits_passed=0 | num_audits_failed=0 console.py:3815
Stopping evaluation with success=True console.py:3815
Starting promotion for 1 snapshots console.py:3815
Stopping promotion with success=False console.py:3815
Stopping plan console.py:3815
Traceback (most recent call last):
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 69, in _process_node
self.fn(node)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 172, in <lambda>
lambda s_id: fn(snapshots_by_id[s_id]),
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 311, in <lambda>
lambda s: self._promote_snapshot(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 1184, in _promote_snapshot
_evaluation_strategy(snapshot, adapter).promote(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 1743, in promote
self.adapter.create_view(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/shared.py", line 312, in internal_wrapper
return func(*list_args, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/base.py", line 1309, in create_view
self.execute(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/base.py", line 2443, in execute
self._execute(sql, track_rows_processed, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/base.py", line 2475, in _execute
self.cursor.execute(sql, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/engines/spark/db_api/spark_session.py", line 27, in execute
self._last_df = self._spark.sql(query)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/pyspark/sql/session.py", line 1440, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 169, in deco
return f(*a, **kw)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o54.sql.
: java.lang.UnsupportedOperationException: Replacing a view is not supported by catalog: glue_catalog
at org.apache.iceberg.spark.SparkCatalog.replaceView(SparkCatalog.java:650)
at org.apache.spark.sql.execution.datasources.v2.CreateV2ViewExec.replaceView(CreateV2ViewExec.scala:103)
at org.apache.spark.sql.execution.datasources.v2.CreateV2ViewExec.run(CreateV2ViewExec.scala:64)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/bin/sqlmesh", line 7, in <module>
sys.exit(cli())
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1161, in __call__
return self.main(*args, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1082, in main
rv = self.invoke(ctx)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1697, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1443, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 788, in invoke
return __callback(*args, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/decorators.py", line 33, in new_func
return f(get_current_context(), *args, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/__init__.py", line 29, in wrapper
return handler(sqlmesh_context, lambda: func(*args, **kwargs))
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/__init__.py", line 51, in _debug_exception_handler
return func()
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/__init__.py", line 29, in <lambda>
return handler(sqlmesh_context, lambda: func(*args, **kwargs))
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/analytics/__init__.py", line 82, in wrapper
return func(*args, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/main.py", line 561, in plan
context.plan(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/analytics/__init__.py", line 110, in wrapper
return func(*args, **kwargs)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 1392, in plan
self.console.plan(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/console.py", line 1853, in plan
self._show_options_after_categorization(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/console.py", line 1982, in _show_options_after_categorization
plan_builder.apply()
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/builder.py", line 273, in apply
self._apply(self.build())
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 1736, in apply
raise e
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 1727, in apply
self._apply(plan, circuit_breaker)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 2519, in _apply
self._scheduler.create_plan_evaluator(self).evaluate(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 107, in evaluate
self._evaluate_stages(plan_stages, plan)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 127, in _evaluate_stages
handler(stage, plan)
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 352, in visit_virtual_layer_update_stage
self._promote_snapshots(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 387, in _promote_snapshots
self.snapshot_evaluator.promote(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 309, in promote
concurrent_apply_to_snapshots(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 170, in concurrent_apply_to_snapshots
return concurrent_apply_to_dag(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 207, in concurrent_apply_to_dag
return ConcurrentDAGExecutor(
File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 64, in run
self._finished_future.result()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
sqlmesh.utils.concurrency.NodeExecutionFailedError: Execution failed for node SnapshotId<"glue_catalog"."default"."simple_sales": 668930965>
I believe this is due to the following lines
https://github.com/TobikoData/sqlmesh/blob/7676aa1d375eebfce8487f617a31bfe87c026fc3/sqlmesh/core/engine_adapter/spark.py#L55
If I change this to use default spark_catalog instead of glue_catalog, then it does insert the data but there are two issues here
- It inserts duplicate records
- Virtual data environment creation gets skipped and it can only deploy on PROD , (but results in duplicates)
What am I missing, if anyone would like to shed a light into this.
Thanks