tfx icon indicating copy to clipboard operation
tfx copied to clipboard

No module named 'user_module_0' kubeflow using DataflowRunner

Open feelingsonice opened this issue 3 years ago • 24 comments

If the bug is related to a specific library below, please raise an issue in the respective repo directly:

TensorFlow Data Validation Repo

TensorFlow Model Analysis Repo

TensorFlow Transform Repo

TensorFlow Serving Repo

System information

  • Have I specified the code to reproduce the issue (Yes, No): Yes
  • Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows), Interactive Notebook, Google Cloud, etc): Google Colab
  • TensorFlow version: 2.8.0
  • TFX Version: 1.7.0
  • Python version: 3.7.12
  • Python dependencies (from pip freeze output):
absl-py==1.0.0
alabaster==0.7.12
albumentations==0.1.12
altair==4.2.0
apache-beam==2.37.0
appdirs==1.4.4
argon2-cffi==21.3.0
argon2-cffi-bindings==21.2.0
arviz==0.11.4
astor==0.8.1
astropy==4.3.1
astunparse==1.6.3
atari-py==0.2.9
atomicwrites==1.4.0
attrs==20.3.0
audioread==2.1.9
autograd==1.3
Babel==2.9.1
backcall==0.2.0
beautifulsoup4==4.6.3
bleach==4.1.0
blis==0.4.1
bokeh==2.3.3
Bottleneck==1.3.4
branca==0.4.2
bs4==0.0.1
CacheControl==0.12.10
cached-property==1.5.2
cachetools==4.2.4
catalogue==1.0.0
certifi==2021.10.8
cffi==1.15.0
cftime==1.6.0
chardet==3.0.4
charset-normalizer==2.0.12
click==7.1.2
cloudpickle==2.0.0
cmake==3.12.0
cmdstanpy==0.9.5
colorcet==3.0.0
colorlover==0.3.0
community==1.0.0b1
contextlib2==0.5.5
convertdate==2.4.0
coverage==3.7.1
coveralls==0.5
crcmod==1.7
cufflinks==0.17.3
cvxopt==1.2.7
cvxpy==1.0.31
cycler==0.11.0
cymem==2.0.6
Cython==0.29.28
daft==0.0.4
dask==2.12.0
datascience==0.10.6
debugpy==1.0.0
decorator==4.4.2
defusedxml==0.7.1
Deprecated==1.2.13
descartes==1.1.0
dill==0.3.1.1
distributed==1.25.3
dlib @ file:///dlib-19.18.0-cp37-cp37m-linux_x86_64.whl
dm-tree==0.1.6
docker==4.4.4
docopt==0.6.2
docstring-parser==0.13
docutils==0.17.1
dopamine-rl==1.0.5
earthengine-api==0.1.303
easydict==1.9
ecos==2.0.10
editdistance==0.5.3
en-core-web-sm @ https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.2.5/en_core_web_sm-2.2.5.tar.gz
entrypoints==0.4
ephem==4.1.3
et-xmlfile==1.1.0
fa2==0.3.5
fastai==1.0.61
fastavro==1.4.10
fastdtw==0.3.4
fasteners==0.17.3
fastprogress==1.0.2
fastrlock==0.8
fbprophet==0.7.1
feather-format==0.4.1
filelock==3.6.0
fire==0.4.0
firebase-admin==4.4.0
fix-yahoo-finance==0.0.22
Flask==1.1.4
flatbuffers==2.0
folium==0.8.3
future==0.16.0
gast==0.5.3
GDAL==2.2.2
gdown==4.2.2
gensim==3.6.0
geographiclib==1.52
geopy==1.17.0
gin-config==0.5.0
glob2==0.7
google==2.0.3
google-api-core==1.31.5
google-api-python-client==1.12.11
google-apitools==0.5.31
google-auth==1.35.0
google-auth-httplib2==0.0.4
google-auth-oauthlib==0.4.6
google-cloud-aiplatform==1.11.0
google-cloud-bigquery==2.34.2
google-cloud-bigquery-storage==2.13.0
google-cloud-bigtable==1.7.0
google-cloud-core==1.7.2
google-cloud-datastore==1.8.0
google-cloud-dlp==3.6.2
google-cloud-firestore==1.7.0
google-cloud-language==1.3.0
google-cloud-pubsub==2.11.0
google-cloud-pubsublite==1.4.1
google-cloud-recommendations-ai==0.2.0
google-cloud-spanner==1.19.1
google-cloud-storage==1.44.0
google-cloud-translate==1.5.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-colab @ file:///colabtools/dist/google-colab-1.0.0.tar.gz
google-crc32c==1.3.0
google-pasta==0.2.0
google-resumable-media==2.3.2
googleapis-common-protos==1.56.0
googledrivedownloader==0.4
graphviz==0.10.1
greenlet==1.1.2
grpc-google-iam-v1==0.12.3
grpcio==1.44.0
grpcio-gcp==0.2.2
grpcio-status==1.44.0
gspread==3.4.2
gspread-dataframe==3.0.8
gym==0.17.3
h5py==3.1.0
hdfs==2.6.0
HeapDict==1.0.1
hijri-converter==2.2.3
holidays==0.10.5.2
holoviews==1.14.8
html5lib==1.0.1
httpimport==0.5.18
httplib2==0.17.4
httplib2shim==0.0.3
humanize==0.5.1
hyperopt==0.1.2
ideep4py==2.0.0.post3
idna==2.10
imageio==2.4.1
imagesize==1.3.0
imbalanced-learn==0.8.1
imblearn==0.0
imgaug==0.2.9
importlib-metadata==4.11.3
importlib-resources==5.4.0
imutils==0.5.4
inflect==2.1.0
iniconfig==1.1.1
intel-openmp==2022.0.2
intervaltree==2.1.0
ipykernel==4.10.1
ipython==7.32.0
ipython-genutils==0.2.0
ipython-sql==0.3.9
ipywidgets==7.7.0
itsdangerous==1.1.0
jax==0.3.4
jaxlib @ https://storage.googleapis.com/jax-releases/cuda11/jaxlib-0.3.2+cuda11.cudnn805-cp37-none-manylinux2010_x86_64.whl
jedi==0.18.1
jieba==0.42.1
Jinja2==2.11.3
joblib==0.14.1
jpeg4py==0.1.4
jsonschema==3.2.0
jupyter==1.0.0
jupyter-client==5.3.5
jupyter-console==5.2.0
jupyter-core==4.9.2
jupyterlab-pygments==0.1.2
jupyterlab-widgets==1.1.0
kaggle==1.5.12
kapre==0.3.7
keras==2.8.0
Keras-Preprocessing==1.1.2
keras-tuner==1.1.1
keras-vis==0.4.1
kfp==1.8.11
kfp-pipeline-spec==0.1.13
kfp-server-api==1.8.1
kiwisolver==1.4.0
korean-lunar-calendar==0.2.1
kt-legacy==1.0.4
kubernetes==12.0.1
libclang==13.0.0
librosa==0.8.1
lightgbm==2.2.3
llvmlite==0.34.0
lmdb==0.99
LunarCalendar==0.0.9
lxml==4.2.6
Markdown==3.3.6
MarkupSafe==2.0.1
matplotlib==3.2.2
matplotlib-inline==0.1.3
matplotlib-venn==0.11.6
missingno==0.5.1
mistune==0.8.4
mizani==0.6.0
mkl==2019.0
ml-metadata==1.7.0
ml-pipelines-sdk==1.7.0
mlxtend==0.14.0
more-itertools==8.12.0
moviepy==0.2.3.5
mpmath==1.2.1
msgpack==1.0.3
multiprocess==0.70.12.2
multitasking==0.0.10
murmurhash==1.0.6
music21==5.5.0
natsort==5.5.0
nbclient==0.5.13
nbconvert==5.6.1
nbformat==5.2.0
nest-asyncio==1.5.4
netCDF4==1.5.8
networkx==2.6.3
nibabel==3.0.2
nltk==3.2.5
notebook==5.3.1
numba==0.51.2
numexpr==2.8.1
numpy==1.21.5
nvidia-ml-py3==7.352.0
oauth2client==4.1.3
oauthlib==3.2.0
okgrade==0.4.3
opencv-contrib-python==4.1.2.30
opencv-python==4.1.2.30
openpyxl==3.0.9
opt-einsum==3.3.0
orjson==3.6.7
osqp==0.6.2.post0
overrides==6.1.0
packaging==20.9
palettable==3.3.0
pandas==1.3.5
pandas-datareader==0.9.0
pandas-gbq==0.13.3
pandas-profiling==1.4.1
pandocfilters==1.5.0
panel==0.12.1
param==1.12.0
parso==0.8.3
pathlib==1.0.1
patsy==0.5.2
pep517==0.12.0
pexpect==4.8.0
pickleshare==0.7.5
Pillow==7.1.2
pip-tools==6.2.0
plac==1.1.3
plotly==5.5.0
plotnine==0.6.0
pluggy==0.7.1
pooch==1.6.0
portpicker==1.3.9
prefetch-generator==1.0.1
preshed==3.0.6
prettytable==3.2.0
progressbar2==3.38.0
prometheus-client==0.13.1
promise==2.3
prompt-toolkit==3.0.28
proto-plus==1.20.3
protobuf==3.19.4
psutil==5.4.8
psycopg2==2.7.6.1
ptyprocess==0.7.0
py==1.11.0
pyarrow==5.0.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycocotools==2.0.4
pycparser==2.21
pyct==0.4.8
pydantic==1.9.0
pydata-google-auth==1.4.0
pydot==1.3.0
pydot-ng==2.0.0
pydotplus==2.0.2
PyDrive==1.3.1
pyemd==0.5.1
pyerfa==2.0.0.1
pyfarmhash==0.3.2
pyglet==1.5.0
Pygments==2.6.1
PyGObject==3.26.1
pymc3==3.11.4
PyMeeus==0.5.11
pymongo==3.12.3
pymystem3==0.2.0
PyOpenGL==3.1.6
pyparsing==3.0.7
pyrsistent==0.18.1
pysndfile==1.3.8
PySocks==1.7.1
pystan==2.19.1.1
pytest==3.6.4
python-apt==0.0.0
python-chess==0.23.11
python-dateutil==2.8.2
python-louvain==0.16
python-slugify==6.1.1
python-utils==3.1.0
pytz==2022.1
pyviz-comms==2.1.0
PyWavelets==1.3.0
PyYAML==5.4.1
pyzmq==22.3.0
qdldl==0.1.5.post0
qtconsole==5.2.2
QtPy==2.0.1
regex==2019.12.20
requests==2.27.1
requests-oauthlib==1.3.1
requests-toolbelt==0.9.1
resampy==0.2.2
rpy2==3.4.5
rsa==4.8
scikit-image==0.18.3
scikit-learn==1.0.2
scipy==1.4.1
screen-resolution-extra==0.0.0
scs==3.2.0
seaborn==0.11.2
semver==2.13.0
Send2Trash==1.8.0
setuptools-git==1.2
Shapely==1.8.1.post1
simplegeneric==0.8.1
six==1.15.0
sklearn==0.0
sklearn-pandas==1.8.0
smart-open==5.2.1
snowballstemmer==2.2.0
sortedcontainers==2.4.0
SoundFile==0.10.3.post1
soupsieve==2.3.1
spacy==2.2.4
Sphinx==1.8.6
sphinxcontrib-serializinghtml==1.1.5
sphinxcontrib-websupport==1.2.4
SQLAlchemy==1.4.32
sqlparse==0.4.2
srsly==1.0.5
statsmodels==0.10.2
strip-hints==0.1.10
sympy==1.7.1
tables==3.7.0
tabulate==0.8.9
tblib==1.7.0
tenacity==8.0.1
tensorboard==2.8.0
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow @ file:///tensorflow-2.8.0-cp37-cp37m-linux_x86_64.whl
tensorflow-data-validation==1.7.0
tensorflow-datasets==4.0.1
tensorflow-estimator==2.8.0
tensorflow-gcs-config==2.8.0
tensorflow-hub==0.12.0
tensorflow-io-gcs-filesystem==0.24.0
tensorflow-metadata==1.7.0
tensorflow-model-analysis==0.38.0
tensorflow-probability==0.16.0
tensorflow-serving-api==2.8.0
tensorflow-transform==1.7.0
termcolor==1.1.0
terminado==0.13.3
testpath==0.6.0
text-unidecode==1.3
textblob==0.15.3
tf-estimator-nightly==2.8.0.dev2021122109
tfx==1.7.0
tfx-bsl==1.7.0
Theano-PyMC==1.1.2
thinc==7.4.0
threadpoolctl==3.1.0
tifffile==2021.11.2
tomli==2.0.1
toolz==0.11.2
torch @ https://download.pytorch.org/whl/cu111/torch-1.10.0%2Bcu111-cp37-cp37m-linux_x86_64.whl
torchaudio @ https://download.pytorch.org/whl/cu111/torchaudio-0.10.0%2Bcu111-cp37-cp37m-linux_x86_64.whl
torchsummary==1.5.1
torchtext==0.11.0
torchvision @ https://download.pytorch.org/whl/cu111/torchvision-0.11.1%2Bcu111-cp37-cp37m-linux_x86_64.whl
tornado==5.1.1
tqdm==4.63.0
traitlets==5.1.1
tweepy==3.10.0
typeguard==2.7.1
typer==0.4.0
typing-extensions==3.10.0.2
typing-utils==0.1.0
tzlocal==1.5.1
uritemplate==3.0.1
urllib3==1.24.3
vega-datasets==0.9.0
wasabi==0.9.0
wcwidth==0.2.5
webencodings==0.5.1
websocket-client==1.3.1
Werkzeug==1.0.1
widgetsnbextension==3.6.0
wordcloud==1.5.0
wrapt==1.14.0
xarray==0.18.2
xgboost==0.90
xkit==0.0.0
xlrd==1.1.0
xlwt==1.3.0
yellowbrick==1.4
zict==2.1.0
zipp==3.7.0

Describe the current behavior

When using KFP version: 1.8.11 on Google Colab, running the pipeline with beam_pipeline_args --runner=DataflowRunner, I get the error "ModuleNotFoundError: No module named 'user_module_0'". Full stacktrace in the screenshot attached.

Describe the expected behavior

Trainer module. This is taken straight from the tutorial with some minor alterations:

from typing import List, Text
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10


def preprocessing_fn(inputs):
  outputs = {}

  for key in _FEATURE_KEYS:
    outputs[key] = tft.scale_to_z_score(inputs[key])

  # the tutorial has this stored as strings. I manually imported this into a BQ table and the labels are ints
  outputs[_LABEL_KEY] = inputs[_LABEL_KEY]
  return outputs


def _get_serve_tf_examples_fn(model, tf_transform_output):
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def serve_tf_examples_fn(serialized_tf_examples):
    feature_spec = tf_transform_output.raw_feature_spec()
    required_feature_spec = {
        k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
    }
    parsed_features = tf.io.parse_example(serialized_tf_examples,
                                          required_feature_spec)

    transformed_features = model.tft_layer(parsed_features)

    return model(transformed_features)

  return serve_tf_examples_fn


def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(batch_size=batch_size),
      schema=tf_transform_output.raw_metadata.schema)

  transform_layer = tf_transform_output.transform_features_layer()
  def apply_transform(raw_features):
    transformed_features = transform_layer(raw_features)
    transformed_label = transformed_features.pop(_LABEL_KEY)
    return transformed_features, transformed_label

  return dataset.map(apply_transform).repeat()


def _build_keras_model() -> tf.keras.Model:
  inputs = [
      keras.layers.Input(shape=(1,), name=key)
      for key in _FEATURE_KEYS
  ]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


def run_fn(fn_args: tfx.components.FnArgs):
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

The pipeline definition, also taken straight from the tutorial with minimum modifications:

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool,
                     beam_pipeline_args: Optional[List[str]]) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
  
  transform = tfx.components.Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      materialize=False,
      module_file=module_file)
  
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-4',
  }

  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      statistics_gen,
      schema_gen,
      transform,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      beam_pipeline_args=beam_pipeline_args)

import os

# I queried from a different table that has the labels preprocessed as ints
# I removed my project information here for privacy reasons
QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
BIG_QUERY_WITH_DF_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--runner=DataflowRunner',
   '--region=us-central1',
   ]

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        use_gpu=False,
        beam_pipeline_args=BIG_QUERY_WITH_DF_RUNNER_BEAM_PIPELINE_ARGS))

Standalone code to reproduce the issue

Run it in google colab.

Screen Shot 2022-03-23 at 5 47 27 PM


For reference I see these two issues are still not resolved:[1, 2]

I tried the solutions suggestion by setting force_tf_compat_v1=True. Still got the same error.

It's also worth noting that my module is stored in GCS; module_file is a GCS URI.

In addition, I'm not importing anything like the other 2 issues. I just have one trainer.py and I'm just trying to run the tutorials.

feelingsonice avatar Mar 24 '22 00:03 feelingsonice

@pindinagesh is this a known issue? I don't see any other discussions on it but I happens to me on both 1.6.1 as well as 1.7.0.

feelingsonice avatar Mar 24 '22 19:03 feelingsonice

Is your module file on GCS (accessible by cloud)?

1025KB avatar Mar 24 '22 19:03 1025KB

Is your module file on GCS (accessible by cloud)?

You mean the dataflow job don't have access to the GCS bucket?

If so, it's possible, but I also don't see any logs stating that. I'm assuming there'd be obvious permission error logs.

feelingsonice avatar Mar 24 '22 19:03 feelingsonice

For context it only happens when I'm running it with DataflowRunner. The direct runner completes just fine so I don't see why the permission wouldn't be provisioned to the dataflow runner.

feelingsonice avatar Mar 24 '22 19:03 feelingsonice

it has, your transform component need to access the module file, your dataflow runner don't need to access module file

1025KB avatar Mar 24 '22 19:03 1025KB

seems your have the same issue as in here

1025KB avatar Mar 24 '22 19:03 1025KB

So I tried setting force_tf_compat_v1=True it didn't work for me

feelingsonice avatar Mar 24 '22 19:03 feelingsonice

It also seem like, from the issue you linked. That it happened when op used import_utils.import_func_from_source. im not using that.

I'm also on 1.7.0. Is this issue still present? Seems like a very common usage case and I'm not doing anything complex.

feelingsonice avatar Mar 24 '22 19:03 feelingsonice

so to confirm, your module file is on GCS, right?

1025KB avatar Mar 24 '22 19:03 1025KB

so to confirm, your module file is on GCS, right?

Yes

feelingsonice avatar Mar 24 '22 19:03 feelingsonice

Just curious, is this example (Kubeflow instead of Vertex) working for you

1025KB avatar Mar 24 '22 20:03 1025KB

Can you link me the colab? I can try it.

feelingsonice avatar Mar 24 '22 20:03 feelingsonice

@1025KB I tried the example. It did not work. I got the same error. But it's worth noting that I had to switch the DAG runner from kubeflow to kubeflowV2. Did it because the V1 runner generates a yaml that vertex ai for some reason dont accept. Maybe there's a easy fix but didn't have time to dig into it.

feelingsonice avatar Mar 24 '22 22:03 feelingsonice

Also I have a corporate gcp account and I had to manually push the data root to my own gcs bucket. I dont have permission other wise

feelingsonice avatar Mar 24 '22 22:03 feelingsonice

The kubeflowDagRunner should work because of this

I created a PR to add that to kubeflowDagRunner V2, but currently there is a bug in PR sync so the PR status became weird.

what I did is I added the following to KubeflowV2DagRunner.run():

for component in pipeline.components:
  # TODO(b/187122662): Pass through pip dependencies as a first-class
  # component flag.
  if isinstance(component, tfx_base_component.BaseComponent):
    component._resolve_pip_dependencies(  # pylint: disable=protected-access
        pipeline.pipeline_info.pipeline_root)

1025KB avatar Mar 24 '22 22:03 1025KB

Hmm. Sounds like I just need to switch kubeflowDagRunner to KubeflowV2DagRunner? Could you give me a quick summary of what's going here?

Also not sure if vertex ai just dont support kubeflowDagRunner or im missing something here but the pipelines on vertex ai only supports json. And kubeflowDagRunner don't seem to produce that.

feelingsonice avatar Mar 24 '22 22:03 feelingsonice

KubeflowDagRunner is for Kubeflow KubeflowV2DagRunner is for Vertex

KubeflowDagRunner should work, and we found a potential fix[1] for KubeflowV2DagRunner. But you mentioned you saw the same "No module named 'user_module_0'" error on both Kubeflow and Vertex, that I'm not sure what's happening...

[1] Adding this to KubeflowV2DagRunner.run function:

for component in pipeline.components:
  # TODO(b/187122662): Pass through pip dependencies as a first-class
  # component flag.
  if isinstance(component, tfx_base_component.BaseComponent):
    component._resolve_pip_dependencies(  # pylint: disable=protected-access
        pipeline.pipeline_info.pipeline_root)

1025KB avatar Mar 24 '22 22:03 1025KB

But you mentioned you saw the same "No module named 'user_module_0'" error on both Kubeflow and Vertex

I didn't. I'm strictly using vertex here.

feelingsonice avatar Mar 24 '22 22:03 feelingsonice

I see, them I misunderstand.

For Vertex, before our PR is in, you can try add that code to KubeflowV2DagRunner.run, and retry?

1025KB avatar Mar 24 '22 22:03 1025KB

Ok can confirm it works. Do you know when the change will be released?

feelingsonice avatar Mar 24 '22 23:03 feelingsonice

next release about a month

1025KB avatar Mar 24 '22 23:03 1025KB

@1025KB So im now running into:

RuntimeError: The order of analyzers in your `preprocessing_fn` appears to be non-deterministic. This can be fixed either by changing your `preprocessing_fn` such that tf.Transform analyzers are encountered in a deterministic order or by passing a unique name to each analyzer API call.

It might just be my own code here, but I can't find what could possibly non-deterministic here and this only appears when I add the --runner=DataflowRunner flag. For reference, my beam_pipeline_args looks like:

BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--runner=DataflowRunner',
   '--region=us-central1',
  '--experiments=upload_graph', # upload_graph must be enabled 
   '--dataflow_service_options=enable_prime',
   '--autoscaling_algorithm=THROUGHPUT_BASED',
   ]

And my preprocessing_fn (redacted for conciseness):

_FEATURES = [# list of str
]
_SPECIAL_IMPUTE = {
    'special_foo': 1,
}
HOURS = [1, 2, 3, 4]
TABLE_KEYS = {
    'XXX': ['XXX_1', 'XXX_2', 'XXX_3'],
    'YYY': ['YYY_1', 'YYY_2', 'YYY_3'],
}

@tf.function
def _divide(a, b):
  return tf.math.divide_no_nan(tf.cast(a, tf.float32), tf.cast(b, tf.float32))

def preprocessing_fn(inputs):
  x = {}

  for name, tensor in sorted(inputs.items()):
    if tensor.dtype == tf.bool:
      tensor = tf.cast(tensor, tf.int64)

    if isinstance(tensor, tf.sparse.SparseTensor):
      default_value = '' if tensor.dtype == tf.string else 0
      tensor = tft.sparse_tensor_to_dense_with_shape(tensor, [None, 1], default_value)
    
    x[name] = tensor

  x['foo'] = _divide((x['foo1'] - x['foo2']), x['foo_denom'])
  x['bar'] = tf.cast(x['bar'] > 0, tf.int64)

  for hour in HOURS:
      total = tf.constant(0, dtype=tf.int64)
      for device_type in DEVICE_TYPES.keys():
          total = total + x[f'some_device_{device_type}_{hour}h']

  # one hot encode categorical values
  for name, keys in TABLE_KEYS.items():
    with tf.init_scope():
      initializer = tf.lookup.KeyValueTensorInitializer(
          tf.constant(keys), 
          tf.constant([i for i in range(len(keys))]))
      table = tf.lookup.StaticHashTable(initializer, default_value=-1)
      
    indices = table.lookup(tf.squeeze(x[name], axis=1))
    one_hot = tf.one_hot(indices, len(keys), dtype=tf.int64)

    for i, _tensor in enumerate(tf.split(one_hot, num_or_size_splits=len(keys), axis=1)):
      x[f'{name}_{keys[i]}'] = _tensor

  return {name: tft.scale_to_0_1(x[name]) for name in _FEATURES}

feelingsonice avatar Mar 25 '22 06:03 feelingsonice

Could you open a separate issue under tft? Thanks!

1025KB avatar Mar 25 '22 16:03 1025KB

@1025KB is this fixed in the latest release?

feelingsonice avatar May 28 '22 17:05 feelingsonice