sqlmesh icon indicating copy to clipboard operation
sqlmesh copied to clipboard

Spark Engine using Glue catalog with iceberg on S3, does not support views, virtual data env

Open vipulr8 opened this issue 5 months ago • 0 comments

I am trying to use AWS Glue as data catalog to create iceberg tables, using spark as execution engine.

Here is config.yml

  local:
    connection:
      type: spark
      config:
        spark.master: local[*]
        spark.app.name: SQLMesh-GlueOnly
        spark.driver.memory: 4g
        spark.executor.memory: 4g
        
        spark.jars: >
          ./jars/iceberg-spark-runtime-3.4_2.12-1.9.2.jar,
          ./jars/iceberg-aws-bundle-1.9.2.jar,
          ./jars/bundle-2.28.17.jar,
          ./jars/url-connection-client-2.28.17.jar,
          ./jars/hadoop-aws-3.3.4.jar,
          ./jars/aws-java-sdk-bundle-1.12.767.jar
        

        spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
        
        spark.sql.catalog.glue_catalog: org.apache.iceberg.spark.SparkCatalog
        spark.sql.catalog.glue_catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
        spark.sql.catalog.glue_catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO
        spark.sql.catalog.glue_catalog.warehouse: s3://dl-poc-helper-dev/sqlmesh/warehouse/
        spark.sql.catalog.glue_catalog.glue.region: eu-west-1
        
        spark.sql.defaultCatalog: glue_catalog
        
        spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
        spark.hadoop.fs.s3a.aws.credentials.provider: >
          org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider,
          software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider
        spark.hadoop.fs.s3a.endpoint.region: eu-west-1
        
    state_connection:
      type: duckdb
      database: ./sqlmesh_state.db

default_gateway: local

model_defaults:
  dialect: spark


Simple model to test -

MODEL (
  name glue_catalog.default.simple_sales,
  kind FULL,
  dialect spark
);

SELECT
  1 as id,
  'Product A' as product_name,
  100.0 as amount,
  date('2024-01-01') as sale_date,
  current_timestamp() as created_at;

this throws error -

 ./scripts/setup_env.sh sqlmesh --debug plan dev                                                                                                                                                                                                    ─╯
Getting AWS credentials from SSO profile...
✅ AWS credentials set
AWS_ACCESS_KEY_ID: .........
AWS_DEFAULT_REGION: eu-west-1
25/09/12 09:18:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/12 09:18:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[09:18:59] Test Results: <sqlmesh.core.test.result.ModelTextTestResult run=0 errors=0 failures=0>                                                                                                                                         console.py:3815
           The duckdb state connection is configured for single threaded mode but the warehouse connection is configured for multi threaded mode with 4 concurrent tasks. This can cause SQLMesh to hang. Overriding the duckdb state     console.py:3815
           connection config to use multi threaded mode.                                                                                                                                                                                                 
           The duckdb engine is not recommended for storing SQLMesh state in production deployments. Please see https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#state-connection for a list of recommended engines and     console.py:3815
           more information.                                                                                                                                                                                                                             
[09:19:00] Environment Difference Summary:                                                                                                                                                                                                console.py:3815
           Model Difference Summary:                                                                                                                                                                                                      console.py:3815
Models needing backfill:
└── default__dev.simple_sales: [full refresh]
Apply - Backfill Tables [y/n]: y
[09:19:08] Starting plan 9831bdc10ed64c468b2bb1d3e4dfd021                                                                                                                                                                                 console.py:3815
[09:19:09] Starting evaluation for 1 snapshots                                                                                                                                                                                            console.py:3815
           Evaluating "glue_catalog"."default"."simple_sales"                                                                                                                                                                             console.py:3815
[09:19:10] Evaluated "glue_catalog"."default"."simple_sales" | batch=0 | duration=923ms | num_audits_passed=0 | num_audits_failed=0                                                                                                       console.py:3815
           Stopping evaluation with success=True                                                                                                                                                                                          console.py:3815
           Starting promotion for 1 snapshots                                                                                                                                                                                             console.py:3815
           Stopping promotion with success=False                                                                                                                                                                                          console.py:3815
           Stopping plan                                                                                                                                                                                                                  console.py:3815
Traceback (most recent call last):
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 69, in _process_node
    self.fn(node)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 172, in <lambda>
    lambda s_id: fn(snapshots_by_id[s_id]),
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 311, in <lambda>
    lambda s: self._promote_snapshot(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 1184, in _promote_snapshot
    _evaluation_strategy(snapshot, adapter).promote(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 1743, in promote
    self.adapter.create_view(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/shared.py", line 312, in internal_wrapper
    return func(*list_args, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/base.py", line 1309, in create_view
    self.execute(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/base.py", line 2443, in execute
    self._execute(sql, track_rows_processed, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/engine_adapter/base.py", line 2475, in _execute
    self.cursor.execute(sql, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/engines/spark/db_api/spark_session.py", line 27, in execute
    self._last_df = self._spark.sql(query)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/pyspark/sql/session.py", line 1440, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 169, in deco
    return f(*a, **kw)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o54.sql.
: java.lang.UnsupportedOperationException: Replacing a view is not supported by catalog: glue_catalog
        at org.apache.iceberg.spark.SparkCatalog.replaceView(SparkCatalog.java:650)
        at org.apache.spark.sql.execution.datasources.v2.CreateV2ViewExec.replaceView(CreateV2ViewExec.scala:103)
        at org.apache.spark.sql.execution.datasources.v2.CreateV2ViewExec.run(CreateV2ViewExec.scala:64)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/bin/sqlmesh", line 7, in <module>
    sys.exit(cli())
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1161, in __call__
    return self.main(*args, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1082, in main
    rv = self.invoke(ctx)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1697, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 1443, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/core.py", line 788, in invoke
    return __callback(*args, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/click/decorators.py", line 33, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/__init__.py", line 29, in wrapper
    return handler(sqlmesh_context, lambda: func(*args, **kwargs))
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/__init__.py", line 51, in _debug_exception_handler
    return func()
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/__init__.py", line 29, in <lambda>
    return handler(sqlmesh_context, lambda: func(*args, **kwargs))
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/analytics/__init__.py", line 82, in wrapper
    return func(*args, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/cli/main.py", line 561, in plan
    context.plan(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/analytics/__init__.py", line 110, in wrapper
    return func(*args, **kwargs)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 1392, in plan
    self.console.plan(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/console.py", line 1853, in plan
    self._show_options_after_categorization(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/console.py", line 1982, in _show_options_after_categorization
    plan_builder.apply()
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/builder.py", line 273, in apply
    self._apply(self.build())
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 1736, in apply
    raise e
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 1727, in apply
    self._apply(plan, circuit_breaker)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/context.py", line 2519, in _apply
    self._scheduler.create_plan_evaluator(self).evaluate(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 107, in evaluate
    self._evaluate_stages(plan_stages, plan)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 127, in _evaluate_stages
    handler(stage, plan)
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 352, in visit_virtual_layer_update_stage
    self._promote_snapshots(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/plan/evaluator.py", line 387, in _promote_snapshots
    self.snapshot_evaluator.promote(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/core/snapshot/evaluator.py", line 309, in promote
    concurrent_apply_to_snapshots(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 170, in concurrent_apply_to_snapshots
    return concurrent_apply_to_dag(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 207, in concurrent_apply_to_dag
    return ConcurrentDAGExecutor(
  File "/Users/vrai/Code/Work/sqlmesh-poc/.venv/lib/python3.9/site-packages/sqlmesh/utils/concurrency.py", line 64, in run
    self._finished_future.result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
sqlmesh.utils.concurrency.NodeExecutionFailedError: Execution failed for node SnapshotId<"glue_catalog"."default"."simple_sales": 668930965>

I believe this is due to the following lines

https://github.com/TobikoData/sqlmesh/blob/7676aa1d375eebfce8487f617a31bfe87c026fc3/sqlmesh/core/engine_adapter/spark.py#L55

If I change this to use default spark_catalog instead of glue_catalog, then it does insert the data but there are two issues here

  • It inserts duplicate records
  • Virtual data environment creation gets skipped and it can only deploy on PROD , (but results in duplicates)

What am I missing, if anyone would like to shed a light into this.

Thanks

vipulr8 avatar Sep 12 '25 07:09 vipulr8