BigQueryExampleGen uses cached result for the schema during RowToExample
If the bug is related to a specific library below, please raise an issue in the respective repo directly:
TensorFlow Data Validation Repo
TensorFlow Model Analysis Repo
System information
- Have I specified the code to reproduce the issue (Yes, No): No
- Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows), Interactive Notebook, Google Cloud, etc): Linux
- TensorFlow version: 2.9.1
- TFX Version: 1.9.1
- Python version: 3.8.13
- Python dependencies (from
pip freezeoutput):
absl-py==1.2.0
apache-beam==2.40.0
argon2-cffi==21.3.0
argon2-cffi-bindings==21.2.0
astroid==2.11.7
astunparse==1.6.3
attrs==20.3.0
backcall==0.2.0
beautifulsoup4==4.11.1
bleach==5.0.1
cachetools==4.2.4
certifi==2022.6.15
cffi==1.15.1
charset-normalizer==2.1.0
click==7.1.2
cloudpickle==2.1.0
crcmod==1.7
debugpy==1.6.2
decorator==5.1.1
defusedxml==0.7.1
Deprecated==1.2.13
dill==0.3.1.1
docformatter==1.4
docker==4.4.4
docopt==0.6.2
docstring-parser==0.14.1
entrypoints==0.4
fastavro==1.5.4
fasteners==0.17.3
fastjsonschema==2.16.1
fire==0.4.0
flatbuffers==1.12
gast==0.4.0
google-api-core==1.32.0
google-api-python-client==1.12.11
google-apitools==0.5.31
google-auth==1.35.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.6
google-cloud-aiplatform==1.16.1
google-cloud-bigquery==2.34.4
google-cloud-bigquery-storage==2.14.1
google-cloud-bigtable==1.7.2
google-cloud-core==1.7.3
google-cloud-datastore==1.15.5
google-cloud-dlp==3.8.0
google-cloud-language==1.3.2
google-cloud-pubsub==2.13.4
google-cloud-pubsublite==1.4.2
google-cloud-recommendations-ai==0.2.0
google-cloud-resource-manager==1.6.0
google-cloud-spanner==1.19.3
google-cloud-storage==1.44.0
google-cloud-videointelligence==1.16.3
google-cloud-vision==1.0.2
google-crc32c==1.3.0
google-pasta==0.2.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.4
grpc-google-iam-v1==0.12.4
grpcio==1.47.0
grpcio-gcp==0.2.2
grpcio-status==1.47.0
h5py==3.7.0
hdfs==2.7.0
httplib2==0.20.4
idna==3.3
importlib-metadata==4.12.0
iniconfig==1.1.1
ipykernel==6.15.1
ipython==7.34.0
ipython-genutils==0.2.0
ipywidgets==7.7.1
isort==5.10.1
jedi==0.18.1
Jinja2==3.1.2
joblib==0.14.1
jsonschema==3.2.0
jupyter-client==7.3.4
jupyter-core==4.11.1
jupyterlab-pygments==0.2.2
jupyterlab-widgets==1.1.1
keras==2.9.0
Keras-Preprocessing==1.1.2
keras-tuner==1.1.3
kfp==1.8.13
kfp-pipeline-spec==0.1.16
kfp-server-api==1.8.4
kt-legacy==1.0.4
kubernetes==12.0.1
lazy-object-proxy==1.7.1
libclang==14.0.6
Markdown==3.4.1
MarkupSafe==2.1.1
matplotlib-inline==0.1.3
mccabe==0.7.0
mistune==0.8.4
ml-addons @ file:///home/martin/source/recommend-ranking/third_party/ml_addons/ml-addons
ml-metadata==1.9.0
ml-pipelines-sdk==1.9.1
nbclient==0.6.6
nbconvert==6.5.0
nbformat==5.4.0
nest-asyncio==1.5.5
notebook==6.4.12
numpy==1.22.4
oauth2client==4.1.3
oauthlib==3.2.0
opt-einsum==3.3.0
orjson==3.7.11
overrides==6.2.0
packaging==20.9
pandas==1.4.3
pandocfilters==1.5.0
parso==0.8.3
pexpect==4.8.0
pickleshare==0.7.5
platformdirs==2.5.2
pluggy==1.0.0
portpicker==1.5.2
prometheus-client==0.14.1
prompt-toolkit==3.0.30
proto-plus==1.20.6
protobuf==3.19.4
psutil==5.9.1
ptyprocess==0.7.0
py==1.11.0
pyarrow==5.0.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.21
pydantic==1.9.1
pydot==1.4.2
pyfarmhash==0.3.2
Pygments==2.12.0
pylint==2.14.5
pymongo==3.12.3
pyparsing==2.4.2
pyrsistent==0.18.1
pytest==7.1.2
python-dateutil==2.8.2
python-snappy==0.6.1
pytz==2022.1
PyYAML==5.4.1
pyzmq==23.2.0
requests==2.28.1
requests-oauthlib==1.3.1
requests-toolbelt==0.9.1
rsa==4.9
scipy==1.9.0
Send2Trash==1.8.0
six==1.16.0
slack-sdk==3.18.1
soupsieve==2.3.2.post1
strip-hints==0.1.10
tabulate==0.8.10
tensorboard==2.9.1
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow==2.9.1
tensorflow-addons==0.17.1
tensorflow-data-validation==1.9.0
tensorflow-estimator==2.9.0
tensorflow-hub==0.12.0
tensorflow-io-gcs-filesystem==0.26.0
tensorflow-metadata==1.9.0
tensorflow-model-analysis==0.40.0
tensorflow-serving-api==2.9.1
tensorflow-text==2.9.0
tensorflow-transform==1.9.0
termcolor==1.1.0
terminado==0.15.0
tfx==1.9.1
tfx-bsl==1.9.0
tinycss2==1.1.1
tomli==2.0.1
tomlkit==0.11.2
tornado==6.2
traitlets==5.3.0
typeguard==2.13.3
typer==0.6.1
typing_extensions==4.3.0
untokenize==0.1.1
uritemplate==3.0.1
urllib3==1.26.11
wcwidth==0.2.5
webencodings==0.5.1
websocket-client==1.3.3
Werkzeug==2.2.1
widgetsnbextension==3.6.1
wrapt==1.14.1
yapf==0.32.0
zipp==3.8.1
Describe the current behavior
I got KeyError at google_cloud_big_query/utils.py#L70
while adding a new column to the existing table for ExampleGen. As I build the table by another query, the input query is the same as before.
If you see the executor of the BigQueryExampleGen component, it uses the cached results to retrieve the schema of the query result. (I checked query_job.cache_hit field)
Describe the expected behavior I want the executor to get the schema from the fresh result, not cached result. I expect the change would be simple and atomic, just setting cache option to false.
bigquery.job.QueryJobConfig(use_query_cache=False)
Standalone code to reproduce the issue
Providing a bare minimum test case or step(s) to reproduce the problem will greatly help us to debug the issue. If possible, please share a link to Colab/Jupyter/any notebook.
Name of your Organization (Optional)
Other info / logs
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1845, in <lambda>
wrapper = lambda x: [fn(x)]
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/example_gen/executor.py", line 48, in RowToExample
def RowToExample(self, instance: Dict[str, Any]) -> tf.train.Example:
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/utils.py", line 70, in row_to_example
data_type = field_to_type[key]
KeyError: 'ctx_noti_keywords'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/martin/.cache/bazel/_bazel_martin/8388c24d80d0468dd842adaf3f43ce54/execroot/recommend-ranking/bazel-out/k8-fastbuild/bin/recommend_ranking/bin/run_pipeline.runfiles/recommend-ranking/recommend_ranking/bin/run_pipeline.py", line 172, in <module>
app.run(run_pipeline)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/absl/app.py", line 308, in run
_run_main(main, args)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/absl/app.py", line 254, in _run_main
sys.exit(main(argv))
File "/home/martin/.cache/bazel/_bazel_martin/8388c24d80d0468dd842adaf3f43ce54/execroot/recommend-ranking/bazel-out/k8-fastbuild/bin/recommend_ranking/bin/run_pipeline.runfiles/recommend-ranking/recommend_ranking/bin/run_pipeline.py", line 45, in run_pipeline
_run_pipeline_local(pipeline, pipeline_config.env_config.local_config)
File "/home/martin/.cache/bazel/_bazel_martin/8388c24d80d0468dd842adaf3f43ce54/execroot/recommend-ranking/bazel-out/k8-fastbuild/bin/recommend_ranking/bin/run_pipeline.runfiles/recommend-ranking/recommend_ranking/bin/run_pipeline.py", line 56, in _run_pipeline_local
runner.run(pipeline)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/tfx_runner.py", line 124, in run
return self.run_with_ir(pipeline_pb, run_options=run_options_pb, **kwargs)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/local/local_dag_runner.py", line 109, in run_with_ir
component_launcher.launch()
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 571, in launch
executor_output = self._run_executor(execution_info)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/launcher.py", line 446, in _run_executor
executor_output = self._executor_operator.run_executor(execution_info)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 98, in run_executor
return python_executor_operator.run_with_executor(execution_info, executor)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 58, in run_with_executor
result = executor.Do(execution_info.input_dict, output_dict,
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 278, in Do
(example_split
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
self.result = self.run()
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 574, in run
return self.runner.run_pipeline(self, self._options)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
bundle_results = self._execute_bundle(
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
self._run_bundle(
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
response = self.worker.do_instruction(request)
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
return getattr(self, request_type)(
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 817, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1845, in <lambda>
wrapper = lambda x: [fn(x)]
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/example_gen/executor.py", line 48, in RowToExample
def RowToExample(self, instance: Dict[str, Any]) -> tf.train.Example:
File "/home/martin/source/recommend-ranking/.venv/lib/python3.8/site-packages/tfx/extensions/google_cloud_big_query/utils.py", line 70, in row_to_example
data_type = field_to_type[key]
KeyError: "ctx_noti_keywords [while running 'InputToRecord/ToTFExample']"
Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.
TFX has its own cache, if all input artifact & exec properties (in this case, input is just the query) are the same, it will use last component execution's result. you can turn off TFX cache when creating pipeline object
@mhsong21,
You can set enable_cache: False while calling while creating create pipeline object in my_pipeline.py file as mentioned in above comment. Please refer Build a custom pipeline for more info on pipeline customisation.
Thank you!