No module named 'user_module_0' kubeflow using DataflowRunner
If the bug is related to a specific library below, please raise an issue in the respective repo directly:
TensorFlow Data Validation Repo
TensorFlow Model Analysis Repo
System information
- Have I specified the code to reproduce the issue (Yes, No): Yes
- Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows), Interactive Notebook, Google Cloud, etc): Google Colab
- TensorFlow version: 2.8.0
- TFX Version: 1.7.0
- Python version: 3.7.12
- Python dependencies (from
pip freezeoutput):
absl-py==1.0.0
alabaster==0.7.12
albumentations==0.1.12
altair==4.2.0
apache-beam==2.37.0
appdirs==1.4.4
argon2-cffi==21.3.0
argon2-cffi-bindings==21.2.0
arviz==0.11.4
astor==0.8.1
astropy==4.3.1
astunparse==1.6.3
atari-py==0.2.9
atomicwrites==1.4.0
attrs==20.3.0
audioread==2.1.9
autograd==1.3
Babel==2.9.1
backcall==0.2.0
beautifulsoup4==4.6.3
bleach==4.1.0
blis==0.4.1
bokeh==2.3.3
Bottleneck==1.3.4
branca==0.4.2
bs4==0.0.1
CacheControl==0.12.10
cached-property==1.5.2
cachetools==4.2.4
catalogue==1.0.0
certifi==2021.10.8
cffi==1.15.0
cftime==1.6.0
chardet==3.0.4
charset-normalizer==2.0.12
click==7.1.2
cloudpickle==2.0.0
cmake==3.12.0
cmdstanpy==0.9.5
colorcet==3.0.0
colorlover==0.3.0
community==1.0.0b1
contextlib2==0.5.5
convertdate==2.4.0
coverage==3.7.1
coveralls==0.5
crcmod==1.7
cufflinks==0.17.3
cvxopt==1.2.7
cvxpy==1.0.31
cycler==0.11.0
cymem==2.0.6
Cython==0.29.28
daft==0.0.4
dask==2.12.0
datascience==0.10.6
debugpy==1.0.0
decorator==4.4.2
defusedxml==0.7.1
Deprecated==1.2.13
descartes==1.1.0
dill==0.3.1.1
distributed==1.25.3
dlib @ file:///dlib-19.18.0-cp37-cp37m-linux_x86_64.whl
dm-tree==0.1.6
docker==4.4.4
docopt==0.6.2
docstring-parser==0.13
docutils==0.17.1
dopamine-rl==1.0.5
earthengine-api==0.1.303
easydict==1.9
ecos==2.0.10
editdistance==0.5.3
en-core-web-sm @ https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.2.5/en_core_web_sm-2.2.5.tar.gz
entrypoints==0.4
ephem==4.1.3
et-xmlfile==1.1.0
fa2==0.3.5
fastai==1.0.61
fastavro==1.4.10
fastdtw==0.3.4
fasteners==0.17.3
fastprogress==1.0.2
fastrlock==0.8
fbprophet==0.7.1
feather-format==0.4.1
filelock==3.6.0
fire==0.4.0
firebase-admin==4.4.0
fix-yahoo-finance==0.0.22
Flask==1.1.4
flatbuffers==2.0
folium==0.8.3
future==0.16.0
gast==0.5.3
GDAL==2.2.2
gdown==4.2.2
gensim==3.6.0
geographiclib==1.52
geopy==1.17.0
gin-config==0.5.0
glob2==0.7
google==2.0.3
google-api-core==1.31.5
google-api-python-client==1.12.11
google-apitools==0.5.31
google-auth==1.35.0
google-auth-httplib2==0.0.4
google-auth-oauthlib==0.4.6
google-cloud-aiplatform==1.11.0
google-cloud-bigquery==2.34.2
google-cloud-bigquery-storage==2.13.0
google-cloud-bigtable==1.7.0
google-cloud-core==1.7.2
google-cloud-datastore==1.8.0
google-cloud-dlp==3.6.2
google-cloud-firestore==1.7.0
google-cloud-language==1.3.0
google-cloud-pubsub==2.11.0
google-cloud-pubsublite==1.4.1
google-cloud-recommendations-ai==0.2.0
google-cloud-spanner==1.19.1
google-cloud-storage==1.44.0
google-cloud-translate==1.5.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-colab @ file:///colabtools/dist/google-colab-1.0.0.tar.gz
google-crc32c==1.3.0
google-pasta==0.2.0
google-resumable-media==2.3.2
googleapis-common-protos==1.56.0
googledrivedownloader==0.4
graphviz==0.10.1
greenlet==1.1.2
grpc-google-iam-v1==0.12.3
grpcio==1.44.0
grpcio-gcp==0.2.2
grpcio-status==1.44.0
gspread==3.4.2
gspread-dataframe==3.0.8
gym==0.17.3
h5py==3.1.0
hdfs==2.6.0
HeapDict==1.0.1
hijri-converter==2.2.3
holidays==0.10.5.2
holoviews==1.14.8
html5lib==1.0.1
httpimport==0.5.18
httplib2==0.17.4
httplib2shim==0.0.3
humanize==0.5.1
hyperopt==0.1.2
ideep4py==2.0.0.post3
idna==2.10
imageio==2.4.1
imagesize==1.3.0
imbalanced-learn==0.8.1
imblearn==0.0
imgaug==0.2.9
importlib-metadata==4.11.3
importlib-resources==5.4.0
imutils==0.5.4
inflect==2.1.0
iniconfig==1.1.1
intel-openmp==2022.0.2
intervaltree==2.1.0
ipykernel==4.10.1
ipython==7.32.0
ipython-genutils==0.2.0
ipython-sql==0.3.9
ipywidgets==7.7.0
itsdangerous==1.1.0
jax==0.3.4
jaxlib @ https://storage.googleapis.com/jax-releases/cuda11/jaxlib-0.3.2+cuda11.cudnn805-cp37-none-manylinux2010_x86_64.whl
jedi==0.18.1
jieba==0.42.1
Jinja2==2.11.3
joblib==0.14.1
jpeg4py==0.1.4
jsonschema==3.2.0
jupyter==1.0.0
jupyter-client==5.3.5
jupyter-console==5.2.0
jupyter-core==4.9.2
jupyterlab-pygments==0.1.2
jupyterlab-widgets==1.1.0
kaggle==1.5.12
kapre==0.3.7
keras==2.8.0
Keras-Preprocessing==1.1.2
keras-tuner==1.1.1
keras-vis==0.4.1
kfp==1.8.11
kfp-pipeline-spec==0.1.13
kfp-server-api==1.8.1
kiwisolver==1.4.0
korean-lunar-calendar==0.2.1
kt-legacy==1.0.4
kubernetes==12.0.1
libclang==13.0.0
librosa==0.8.1
lightgbm==2.2.3
llvmlite==0.34.0
lmdb==0.99
LunarCalendar==0.0.9
lxml==4.2.6
Markdown==3.3.6
MarkupSafe==2.0.1
matplotlib==3.2.2
matplotlib-inline==0.1.3
matplotlib-venn==0.11.6
missingno==0.5.1
mistune==0.8.4
mizani==0.6.0
mkl==2019.0
ml-metadata==1.7.0
ml-pipelines-sdk==1.7.0
mlxtend==0.14.0
more-itertools==8.12.0
moviepy==0.2.3.5
mpmath==1.2.1
msgpack==1.0.3
multiprocess==0.70.12.2
multitasking==0.0.10
murmurhash==1.0.6
music21==5.5.0
natsort==5.5.0
nbclient==0.5.13
nbconvert==5.6.1
nbformat==5.2.0
nest-asyncio==1.5.4
netCDF4==1.5.8
networkx==2.6.3
nibabel==3.0.2
nltk==3.2.5
notebook==5.3.1
numba==0.51.2
numexpr==2.8.1
numpy==1.21.5
nvidia-ml-py3==7.352.0
oauth2client==4.1.3
oauthlib==3.2.0
okgrade==0.4.3
opencv-contrib-python==4.1.2.30
opencv-python==4.1.2.30
openpyxl==3.0.9
opt-einsum==3.3.0
orjson==3.6.7
osqp==0.6.2.post0
overrides==6.1.0
packaging==20.9
palettable==3.3.0
pandas==1.3.5
pandas-datareader==0.9.0
pandas-gbq==0.13.3
pandas-profiling==1.4.1
pandocfilters==1.5.0
panel==0.12.1
param==1.12.0
parso==0.8.3
pathlib==1.0.1
patsy==0.5.2
pep517==0.12.0
pexpect==4.8.0
pickleshare==0.7.5
Pillow==7.1.2
pip-tools==6.2.0
plac==1.1.3
plotly==5.5.0
plotnine==0.6.0
pluggy==0.7.1
pooch==1.6.0
portpicker==1.3.9
prefetch-generator==1.0.1
preshed==3.0.6
prettytable==3.2.0
progressbar2==3.38.0
prometheus-client==0.13.1
promise==2.3
prompt-toolkit==3.0.28
proto-plus==1.20.3
protobuf==3.19.4
psutil==5.4.8
psycopg2==2.7.6.1
ptyprocess==0.7.0
py==1.11.0
pyarrow==5.0.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycocotools==2.0.4
pycparser==2.21
pyct==0.4.8
pydantic==1.9.0
pydata-google-auth==1.4.0
pydot==1.3.0
pydot-ng==2.0.0
pydotplus==2.0.2
PyDrive==1.3.1
pyemd==0.5.1
pyerfa==2.0.0.1
pyfarmhash==0.3.2
pyglet==1.5.0
Pygments==2.6.1
PyGObject==3.26.1
pymc3==3.11.4
PyMeeus==0.5.11
pymongo==3.12.3
pymystem3==0.2.0
PyOpenGL==3.1.6
pyparsing==3.0.7
pyrsistent==0.18.1
pysndfile==1.3.8
PySocks==1.7.1
pystan==2.19.1.1
pytest==3.6.4
python-apt==0.0.0
python-chess==0.23.11
python-dateutil==2.8.2
python-louvain==0.16
python-slugify==6.1.1
python-utils==3.1.0
pytz==2022.1
pyviz-comms==2.1.0
PyWavelets==1.3.0
PyYAML==5.4.1
pyzmq==22.3.0
qdldl==0.1.5.post0
qtconsole==5.2.2
QtPy==2.0.1
regex==2019.12.20
requests==2.27.1
requests-oauthlib==1.3.1
requests-toolbelt==0.9.1
resampy==0.2.2
rpy2==3.4.5
rsa==4.8
scikit-image==0.18.3
scikit-learn==1.0.2
scipy==1.4.1
screen-resolution-extra==0.0.0
scs==3.2.0
seaborn==0.11.2
semver==2.13.0
Send2Trash==1.8.0
setuptools-git==1.2
Shapely==1.8.1.post1
simplegeneric==0.8.1
six==1.15.0
sklearn==0.0
sklearn-pandas==1.8.0
smart-open==5.2.1
snowballstemmer==2.2.0
sortedcontainers==2.4.0
SoundFile==0.10.3.post1
soupsieve==2.3.1
spacy==2.2.4
Sphinx==1.8.6
sphinxcontrib-serializinghtml==1.1.5
sphinxcontrib-websupport==1.2.4
SQLAlchemy==1.4.32
sqlparse==0.4.2
srsly==1.0.5
statsmodels==0.10.2
strip-hints==0.1.10
sympy==1.7.1
tables==3.7.0
tabulate==0.8.9
tblib==1.7.0
tenacity==8.0.1
tensorboard==2.8.0
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow @ file:///tensorflow-2.8.0-cp37-cp37m-linux_x86_64.whl
tensorflow-data-validation==1.7.0
tensorflow-datasets==4.0.1
tensorflow-estimator==2.8.0
tensorflow-gcs-config==2.8.0
tensorflow-hub==0.12.0
tensorflow-io-gcs-filesystem==0.24.0
tensorflow-metadata==1.7.0
tensorflow-model-analysis==0.38.0
tensorflow-probability==0.16.0
tensorflow-serving-api==2.8.0
tensorflow-transform==1.7.0
termcolor==1.1.0
terminado==0.13.3
testpath==0.6.0
text-unidecode==1.3
textblob==0.15.3
tf-estimator-nightly==2.8.0.dev2021122109
tfx==1.7.0
tfx-bsl==1.7.0
Theano-PyMC==1.1.2
thinc==7.4.0
threadpoolctl==3.1.0
tifffile==2021.11.2
tomli==2.0.1
toolz==0.11.2
torch @ https://download.pytorch.org/whl/cu111/torch-1.10.0%2Bcu111-cp37-cp37m-linux_x86_64.whl
torchaudio @ https://download.pytorch.org/whl/cu111/torchaudio-0.10.0%2Bcu111-cp37-cp37m-linux_x86_64.whl
torchsummary==1.5.1
torchtext==0.11.0
torchvision @ https://download.pytorch.org/whl/cu111/torchvision-0.11.1%2Bcu111-cp37-cp37m-linux_x86_64.whl
tornado==5.1.1
tqdm==4.63.0
traitlets==5.1.1
tweepy==3.10.0
typeguard==2.7.1
typer==0.4.0
typing-extensions==3.10.0.2
typing-utils==0.1.0
tzlocal==1.5.1
uritemplate==3.0.1
urllib3==1.24.3
vega-datasets==0.9.0
wasabi==0.9.0
wcwidth==0.2.5
webencodings==0.5.1
websocket-client==1.3.1
Werkzeug==1.0.1
widgetsnbextension==3.6.0
wordcloud==1.5.0
wrapt==1.14.0
xarray==0.18.2
xgboost==0.90
xkit==0.0.0
xlrd==1.1.0
xlwt==1.3.0
yellowbrick==1.4
zict==2.1.0
zipp==3.7.0
Describe the current behavior
When using KFP version: 1.8.11 on Google Colab, running the pipeline with beam_pipeline_args --runner=DataflowRunner, I get the error "ModuleNotFoundError: No module named 'user_module_0'". Full stacktrace in the screenshot attached.
Describe the expected behavior
Trainer module. This is taken straight from the tutorial with some minor alterations:
from typing import List, Text
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
def preprocessing_fn(inputs):
outputs = {}
for key in _FEATURE_KEYS:
outputs[key] = tft.scale_to_z_score(inputs[key])
# the tutorial has this stored as strings. I manually imported this into a BQ table and the labels are ints
outputs[_LABEL_KEY] = inputs[_LABEL_KEY]
return outputs
def _get_serve_tf_examples_fn(model, tf_transform_output):
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
])
def serve_tf_examples_fn(serialized_tf_examples):
feature_spec = tf_transform_output.raw_feature_spec()
required_feature_spec = {
k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
}
parsed_features = tf.io.parse_example(serialized_tf_examples,
required_feature_spec)
transformed_features = model.tft_layer(parsed_features)
return model(transformed_features)
return serve_tf_examples_fn
def _input_fn(file_pattern: List[Text],
data_accessor: tfx.components.DataAccessor,
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
dataset = data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(batch_size=batch_size),
schema=tf_transform_output.raw_metadata.schema)
transform_layer = tf_transform_output.transform_features_layer()
def apply_transform(raw_features):
transformed_features = transform_layer(raw_features)
transformed_label = transformed_features.pop(_LABEL_KEY)
return transformed_features, transformed_label
return dataset.map(apply_transform).repeat()
def _build_keras_model() -> tf.keras.Model:
inputs = [
keras.layers.Input(shape=(1,), name=key)
for key in _FEATURE_KEYS
]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
def run_fn(fn_args: tfx.components.FnArgs):
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=_EVAL_BATCH_SIZE)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
signatures = {
'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
The pipeline definition, also taken straight from the tutorial with minimum modifications:
from typing import List, Optional
def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
module_file: str, endpoint_name: str, project_id: str,
region: str, use_gpu: bool,
beam_pipeline_args: Optional[List[str]]) -> tfx.dsl.Pipeline:
"""Implements the penguin pipeline with TFX."""
example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
query=query)
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples'])
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
materialize=False,
module_file=module_file)
vertex_job_spec = {
'project': project_id,
'worker_pool_specs': [{
'machine_spec': {
'machine_type': 'n1-standard-4',
},
'replica_count': 1,
'container_spec': {
'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
},
}],
}
if use_gpu:
vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
'accelerator_type': 'NVIDIA_TESLA_K80',
'accelerator_count': 1
})
trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
transform_graph=transform.outputs['transform_graph'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5),
custom_config={
tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
True,
tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
region,
tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
vertex_job_spec,
'use_gpu':
use_gpu,
})
vertex_serving_spec = {
'project_id': project_id,
'endpoint_name': endpoint_name,
'machine_type': 'n1-standard-4',
}
serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
if use_gpu:
vertex_serving_spec.update({
'accelerator_type': 'NVIDIA_TESLA_K80',
'accelerator_count': 1
})
serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'
pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
model=trainer.outputs['model'],
custom_config={
tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
True,
tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
region,
tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
serving_image,
tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
vertex_serving_spec,
})
components = [
example_gen,
statistics_gen,
schema_gen,
transform,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
beam_pipeline_args=beam_pipeline_args)
import os
# I queried from a different table that has the labels preprocessed as ints
# I removed my project information here for privacy reasons
QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
BIG_QUERY_WITH_DF_RUNNER_BEAM_PIPELINE_ARGS = [
'--project=' + GOOGLE_CLOUD_PROJECT,
'--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
'--runner=DataflowRunner',
'--region=us-central1',
]
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
query=QUERY,
module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
endpoint_name=ENDPOINT_NAME,
project_id=GOOGLE_CLOUD_PROJECT,
region=GOOGLE_CLOUD_REGION,
use_gpu=False,
beam_pipeline_args=BIG_QUERY_WITH_DF_RUNNER_BEAM_PIPELINE_ARGS))
Standalone code to reproduce the issue
Run it in google colab.

For reference I see these two issues are still not resolved:[1, 2]
I tried the solutions suggestion by setting force_tf_compat_v1=True. Still got the same error.
It's also worth noting that my module is stored in GCS; module_file is a GCS URI.
In addition, I'm not importing anything like the other 2 issues. I just have one trainer.py and I'm just trying to run the tutorials.
@pindinagesh is this a known issue? I don't see any other discussions on it but I happens to me on both 1.6.1 as well as 1.7.0.
Is your module file on GCS (accessible by cloud)?
Is your module file on GCS (accessible by cloud)?
You mean the dataflow job don't have access to the GCS bucket?
If so, it's possible, but I also don't see any logs stating that. I'm assuming there'd be obvious permission error logs.
For context it only happens when I'm running it with DataflowRunner. The direct runner completes just fine so I don't see why the permission wouldn't be provisioned to the dataflow runner.
it has, your transform component need to access the module file, your dataflow runner don't need to access module file
seems your have the same issue as in here
So I tried setting force_tf_compat_v1=True it didn't work for me
It also seem like, from the issue you linked. That it happened when op used import_utils.import_func_from_source. im not using that.
I'm also on 1.7.0. Is this issue still present? Seems like a very common usage case and I'm not doing anything complex.
so to confirm, your module file is on GCS, right?
so to confirm, your module file is on GCS, right?
Yes
Just curious, is this example (Kubeflow instead of Vertex) working for you
Can you link me the colab? I can try it.
@1025KB I tried the example. It did not work. I got the same error. But it's worth noting that I had to switch the DAG runner from kubeflow to kubeflowV2. Did it because the V1 runner generates a yaml that vertex ai for some reason dont accept. Maybe there's a easy fix but didn't have time to dig into it.
Also I have a corporate gcp account and I had to manually push the data root to my own gcs bucket. I dont have permission other wise
The kubeflowDagRunner should work because of this
I created a PR to add that to kubeflowDagRunner V2, but currently there is a bug in PR sync so the PR status became weird.
what I did is I added the following to KubeflowV2DagRunner.run():
for component in pipeline.components:
# TODO(b/187122662): Pass through pip dependencies as a first-class
# component flag.
if isinstance(component, tfx_base_component.BaseComponent):
component._resolve_pip_dependencies( # pylint: disable=protected-access
pipeline.pipeline_info.pipeline_root)
Hmm. Sounds like I just need to switch kubeflowDagRunner to KubeflowV2DagRunner? Could you give me a quick summary of what's going here?
Also not sure if vertex ai just dont support kubeflowDagRunner or im missing something here but the pipelines on vertex ai only supports json. And kubeflowDagRunner don't seem to produce that.
KubeflowDagRunner is for Kubeflow KubeflowV2DagRunner is for Vertex
KubeflowDagRunner should work, and we found a potential fix[1] for KubeflowV2DagRunner. But you mentioned you saw the same "No module named 'user_module_0'" error on both Kubeflow and Vertex, that I'm not sure what's happening...
[1] Adding this to KubeflowV2DagRunner.run function:
for component in pipeline.components:
# TODO(b/187122662): Pass through pip dependencies as a first-class
# component flag.
if isinstance(component, tfx_base_component.BaseComponent):
component._resolve_pip_dependencies( # pylint: disable=protected-access
pipeline.pipeline_info.pipeline_root)
But you mentioned you saw the same "No module named 'user_module_0'" error on both Kubeflow and Vertex
I didn't. I'm strictly using vertex here.
I see, them I misunderstand.
For Vertex, before our PR is in, you can try add that code to KubeflowV2DagRunner.run, and retry?
Ok can confirm it works. Do you know when the change will be released?
next release about a month
@1025KB So im now running into:
RuntimeError: The order of analyzers in your `preprocessing_fn` appears to be non-deterministic. This can be fixed either by changing your `preprocessing_fn` such that tf.Transform analyzers are encountered in a deterministic order or by passing a unique name to each analyzer API call.
It might just be my own code here, but I can't find what could possibly non-deterministic here and this only appears when I add the --runner=DataflowRunner flag. For reference, my beam_pipeline_args looks like:
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
'--project=' + GOOGLE_CLOUD_PROJECT,
'--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
'--runner=DataflowRunner',
'--region=us-central1',
'--experiments=upload_graph', # upload_graph must be enabled
'--dataflow_service_options=enable_prime',
'--autoscaling_algorithm=THROUGHPUT_BASED',
]
And my preprocessing_fn (redacted for conciseness):
_FEATURES = [# list of str
]
_SPECIAL_IMPUTE = {
'special_foo': 1,
}
HOURS = [1, 2, 3, 4]
TABLE_KEYS = {
'XXX': ['XXX_1', 'XXX_2', 'XXX_3'],
'YYY': ['YYY_1', 'YYY_2', 'YYY_3'],
}
@tf.function
def _divide(a, b):
return tf.math.divide_no_nan(tf.cast(a, tf.float32), tf.cast(b, tf.float32))
def preprocessing_fn(inputs):
x = {}
for name, tensor in sorted(inputs.items()):
if tensor.dtype == tf.bool:
tensor = tf.cast(tensor, tf.int64)
if isinstance(tensor, tf.sparse.SparseTensor):
default_value = '' if tensor.dtype == tf.string else 0
tensor = tft.sparse_tensor_to_dense_with_shape(tensor, [None, 1], default_value)
x[name] = tensor
x['foo'] = _divide((x['foo1'] - x['foo2']), x['foo_denom'])
x['bar'] = tf.cast(x['bar'] > 0, tf.int64)
for hour in HOURS:
total = tf.constant(0, dtype=tf.int64)
for device_type in DEVICE_TYPES.keys():
total = total + x[f'some_device_{device_type}_{hour}h']
# one hot encode categorical values
for name, keys in TABLE_KEYS.items():
with tf.init_scope():
initializer = tf.lookup.KeyValueTensorInitializer(
tf.constant(keys),
tf.constant([i for i in range(len(keys))]))
table = tf.lookup.StaticHashTable(initializer, default_value=-1)
indices = table.lookup(tf.squeeze(x[name], axis=1))
one_hot = tf.one_hot(indices, len(keys), dtype=tf.int64)
for i, _tensor in enumerate(tf.split(one_hot, num_or_size_splits=len(keys), axis=1)):
x[f'{name}_{keys[i]}'] = _tensor
return {name: tft.scale_to_0_1(x[name]) for name in _FEATURES}
Could you open a separate issue under tft? Thanks!
@1025KB is this fixed in the latest release?