[Bug]: Python - SQL Transform fails with only one named PCollection
What happened?
Description: When attempting to execute a SQL query on a PCollection named 'data1', the following error occurs:
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'data1' not found
Steps to Reproduce:
- Initialize an Apache Beam pipeline.
- Create a PCollection named 'data1' using the
beam.Createandbeam.Maptransforms. - Attempt to execute a SQL query
"SELECT * FROM data1"on the 'data1' PCollection using theSqlTransformtransform. - Run the pipeline.
Expected Behavior: The pipeline should execute the SQL query successfully on the 'data1' PCollection without errors.
Actual Behavior: The pipeline throws an error when attempting to execute the SQL query, stating that the object 'data1' is not found.
Error Stack Trace:
RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse query SELECT * FROM data1
...
Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.ValidationException: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 19: Object 'data1' not found
...
Additional Observations:
- When passing a dictionary containing both 'data1' and another PCollection named 'data2' to the same SQL query, the code runs successfully.
- Output is generated as expected with no errors in this scenario.
Code Snippets:
Error-causing code snippet:
p = apache_beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(['a']) | beam.Map(lambda x: beam.Row(a='a'))
{'data1': data1} | SqlTransform("SELECT * FROM data1") | beam.Map(print)
p.run()
Successful code snippets:
- Adding another unused pCollection -
p = apache_beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(['a']) | beam.Map(lambda x: beam.Row(a='a'))
data2 = p | "Create data2" >> beam.Create(['b']) | beam.Map(lambda x: beam.Row(b='b'))
{'data1': data1, 'data2': data2} | SqlTransform("SELECT * FROM data1") | beam.Map(print)
p.run()
- Replacing data1 with PCOLLECTION, in the SQL query -
p = apache_beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(['a']) | beam.Map(lambda x: beam.Row(a='a'))
{'data1': data1} | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(print)
p.run()
Environment:
- Apache Beam version: 2.53.0
- Operating System: macOS 14.3 (M1 chip)
- Python version: 3.11.7
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
It is a bit strange. But by checking https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql_test.py#L119, tagged pcolls are for joining. If you only have one pcoll for your sql, you should just use PCOLLECTION in the sql query.
So this works:
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
p = beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(["a"]) | beam.Map(lambda x: beam.Row(a="a"))
{"data1": data1} | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(print)
p.run()
or
p = beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(["a"]) | beam.Map(lambda x: beam.Row(a="a"))
data1 | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(print)
p.run()