beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Python - SQL Transform fails with only one named PCollection

Open experiencedProcrastinator opened this issue 1 year ago • 1 comments

What happened?

Description: When attempting to execute a SQL query on a PCollection named 'data1', the following error occurs:

org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'data1' not found

Steps to Reproduce:

  1. Initialize an Apache Beam pipeline.
  2. Create a PCollection named 'data1' using the beam.Create and beam.Map transforms.
  3. Attempt to execute a SQL query "SELECT * FROM data1" on the 'data1' PCollection using the SqlTransform transform.
  4. Run the pipeline.

Expected Behavior: The pipeline should execute the SQL query successfully on the 'data1' PCollection without errors.

Actual Behavior: The pipeline throws an error when attempting to execute the SQL query, stating that the object 'data1' is not found.

Error Stack Trace:

RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse query SELECT * FROM data1
    ...
Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.ValidationException: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 19: Object 'data1' not found
    ...

Additional Observations:

  • When passing a dictionary containing both 'data1' and another PCollection named 'data2' to the same SQL query, the code runs successfully.
  • Output is generated as expected with no errors in this scenario.

Code Snippets:

Error-causing code snippet:

p = apache_beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(['a']) | beam.Map(lambda x: beam.Row(a='a'))
{'data1': data1} | SqlTransform("SELECT * FROM data1") | beam.Map(print)
p.run()

Successful code snippets:

  1. Adding another unused pCollection -
p = apache_beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(['a']) | beam.Map(lambda x: beam.Row(a='a'))
data2 = p | "Create data2" >> beam.Create(['b']) | beam.Map(lambda x: beam.Row(b='b'))
{'data1': data1, 'data2': data2} | SqlTransform("SELECT * FROM data1") | beam.Map(print)
p.run()
  1. Replacing data1 with PCOLLECTION, in the SQL query -
p = apache_beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(['a']) | beam.Map(lambda x: beam.Row(a='a'))
{'data1': data1} | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(print)
p.run()

Environment:

  • Apache Beam version: 2.53.0
  • Operating System: macOS 14.3 (M1 chip)
  • Python version: 3.11.7

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [X] Component: Python SDK
  • [ ] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [X] Component: Google Cloud Dataflow Runner

It is a bit strange. But by checking https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql_test.py#L119, tagged pcolls are for joining. If you only have one pcoll for your sql, you should just use PCOLLECTION in the sql query.

So this works:

import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform

p = beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(["a"]) | beam.Map(lambda x: beam.Row(a="a"))
{"data1": data1} | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(print)
p.run()

or

p = beam.Pipeline()
data1 = p | "Create data1" >> beam.Create(["a"]) | beam.Map(lambda x: beam.Row(a="a"))
data1 | SqlTransform("SELECT * FROM PCOLLECTION") | beam.Map(print)
p.run()

liferoad avatar Feb 17 '24 21:02 liferoad