flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] Task to fails to import modules in `pyflyte-map-execute` mode but works in `pyflyte-execute` mode

Open debajyoti-truefoundry opened this issue 2 years ago • 3 comments

Describe the bug

  ~/work/pipelines/..         │I   1 import os     ■ Missing module docstring
    .git                    │    2 from functools import partial
    flyte                   │    3 from pathlib import Path
      installation          │    4 from typing import List, Tuple
      my_workflow           │    5
        __pycache__         │E   6 import flytekit     ■ Unable to import 'flytekit'
        my_module           │E   7 from flytekit import (     ■ Unable to import 'flytekit'
          __pycache__       │    8     ImageSpec,
           __init__.py       │    9     PodTemplate,
           my_module.py      │   10     Resources,
         __init__.py         │   11     conditional,
         register.sh         │   12     map_task,
         run.sh              │   13     task,
         train_model_workflow│   14     workflow,
       .gitignore            │   15 )
       config.yaml           │E  16 from flytekit.types.directory import FlyteDirectory     ■ Unable to import 'flytekit.types.directory'
       requirements.txt      │E  17 from kubernetes import client     ■ Unable to import 'kubernetes'
                              │   18 from my_workflow.my_module.my_module import random

This is what my current folder structure looks like, and on line 18, I have imported a module.

My workflow file looks like this,

import os
from functools import partial
from pathlib import Path
from typing import List, Tuple

import flytekit
from flytekit import (
    ImageSpec,
    PodTemplate,
    Resources,
    conditional,
    map_task,
    task,
    workflow,
)
from flytekit.types.directory import FlyteDirectory
from kubernetes import client
from my_workflow.my_module.my_module import random

nvml_image_spec = ImageSpec(
    base_image="python:3.9-slim",
    packages=["flytekit==1.10.3", "pynvml==11.5.0"],
    registry="redacted",
    name="redacted-pytorch",
    source_root="..",
)


normal_image = ImageSpec(
    base_image="python:3.9-slim",
    packages=["flytekit==1.10.3"],
    registry="redacted",
    name="redacted-normal",
    source_root="..",
)

cpu_resource = Resources(mem="300Mi", cpu="0.1")
gpu_resource = Resources(mem="900Mi", cpu="0.1", gpu="1")
gpu_pod_template = PodTemplate(
    pod_spec=client.V1PodSpec(
        containers=[],
        affinity=client.V1Affinity(
            node_affinity=client.V1NodeAffinity(
                required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
                    node_selector_terms=[
                        client.V1NodeSelectorTerm(
                            match_expressions=[
                                client.V1NodeSelectorRequirement(
                                    key="karpenter.k8s.aws/instance-family",
                                    operator="In",
                                    values=["g4dn"],
                                )
                            ]
                        )
                    ]
                )
            )
        ),
    )
)


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def should_train_tokenizer(tokenizer: str) -> bool:
    return not bool(tokenizer)


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def train_tokenizer() -> str:
    return "trained_tokenizer"


@task(
    container_image=nvml_image_spec,
    requests=gpu_resource,
    limits=gpu_resource,
    pod_template=gpu_pod_template,
    # accelerator=flytekit.extras.accelerators.T4,
)
def train_model(tokenizer: str) -> FlyteDirectory:
    from pynvml import nvmlInit, nvmlDeviceGetCount

    nvmlInit()
    assert nvmlDeviceGetCount() > 0

    random.random()
    working_dir = flytekit.current_context().working_directory
    local_dir = Path(os.path.join(working_dir, "csv_files"))
    local_dir.mkdir(exist_ok=True)

    with open(os.path.join(local_dir, "model"), "w", encoding="utf-8") as f:
        f.write(tokenizer)

    return FlyteDirectory(path=str(local_dir))


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def validate_model(model: FlyteDirectory, tokenizer: str, validation_data: str) -> bool:
    print(validation_data)
    model_path = os.path.join(model, "model")
    with open(model_path, "r", encoding="utf-8") as f:
        return f.read() == tokenizer


@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def all_good(validations: List[bool]) -> bool:
    return all(validations)


@workflow
def train_tokenizer_and_model() -> Tuple[FlyteDirectory, str]:
    tokenizer = train_tokenizer()
    model = train_model(tokenizer=tokenizer)
    return model, tokenizer


@workflow
def just_train_model(tokenizer: str) -> Tuple[FlyteDirectory, str]:
    model = train_model(tokenizer=tokenizer)
    return model, tokenizer


@workflow
def train(tokenizer: str = "") -> bool:
    stt = should_train_tokenizer(tokenizer=tokenizer)
    model, t = (
        conditional("train_tokenizer")
        .if_(stt.is_true())
        .then(train_tokenizer_and_model())
        .else_()
        .then(just_train_model(tokenizer=tokenizer))
    )
    validation_task = partial(validate_model, model=model, tokenizer=t)
    validations = map_task(
        validation_task,
        concurrency=2,
    )(validation_data=["foo", "bar", "baz"])
    return all_good(validations=validations)

I used the following command to run the workflow remotely.

pyflyte -v --config ../config.yaml run --remote train_model_workflow.py train --tokenizer Deb

image

As you can see in the above screenshot, the validate model map task fails. This is the log I can see in the corresponding pod

tar: Removing leading `/' from member names
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/bin/pyflyte-map-execute:8 in <module>                             │
│                                                                              │
│ ❱ 8 │   sys.exit(map_execute_task_cmd())                                     │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1157 in __call__        │
│                                                                              │
│ ❱ 1157 │   │   return self.main(*args, **kwargs)                             │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1078 in main            │
│                                                                              │
│ ❱ 1078 │   │   │   │   │   rv = self.invoke(ctx)                             │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1434 in invoke          │
│                                                                              │
│ ❱ 1434 │   │   │   return ctx.invoke(self.callback, **ctx.params)            │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/click/core.py:783 in invoke           │
│                                                                              │
│ ❱  783 │   │   │   │   return __callback(*args, **kwargs)                    │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:577 in     │
│ map_execute_task_cmd                                                         │
│                                                                              │
│ ❱ 577 │   _execute_map_task(                                                 │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:143 in  │
│ f                                                                            │
│                                                                              │
│ ❱ 143 │   │   │   return outer_f(inner_f, args, kwargs)                      │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:173 in  │
│ system_entry_point                                                           │
│                                                                              │
│ ❱ 173 │   │   │   │   return wrapped(*args, **kwargs)                        │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:418 in     │
│ _execute_map_task                                                            │
│                                                                              │
│ ❱ 418 │   │   map_task = mtr.load_task(loader_args=resolver_args, max_concur │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/utils.py:309 in wrapper │
│                                                                              │
│ ❱ 309 │   │   │   │   return func(*args, **kwargs)                           │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/map_task.py:401 in      │
│ load_task                                                                    │
│                                                                              │
│ ❱ 401 │   │   _task_def = resolver_obj.load_task(loader_args=resolver_args)  │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/utils.py:309 in wrapper │
│                                                                              │
│ ❱ 309 │   │   │   │   return func(*args, **kwargs)                           │
│                                                                              │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/python_auto_container.p │
│ y:248 in load_task                                                           │
│                                                                              │
│ ❱ 248 │   │   task_module = importlib.import_module(name=task_module)  # typ │
│                                                                              │
│ /usr/local/lib/python3.9/importlib/__init__.py:127 in import_module          │
│                                                                              │
│ ❱ 127 │   return _bootstrap._gcd_import(name[level:], package, level)        │
│ in _gcd_import:1030                                                          │
│ in _find_and_load:1007                                                       │
│ in _find_and_load_unlocked:986                                               │
│ in _load_unlocked:680                                                        │
│ in exec_module:850                                                           │
│ in _call_with_frames_removed:228                                             │
│                                                                              │
│ /my_workflow/train_model_workflow.py:18 in <module>                          │
│                                                                              │
│ ❱  18 from my_workflow.my_module.my_module import random                     │
╰──────────────────────────────────────────────────────────────────────────────╯
ModuleNotFoundError: No module named 'my_workflow.my_module'
Getting s3://metadata/flytesnacks/development/ZRPWNGFGX2H5F5QHTYL7AZSPOY======/script_mode.tar.gz to ./

I cannot understand why the should_train_tokenizer task was executed successfully, but the map task failed.

command for the failed task,

pyflyte-fast-execute --additional-distribution s3://metadata/flytesnacks/development/ZRPWNGFGX2H5F5QHTYL7AZSPOY======/script_mode.tar.gz --dest-dir . -- pyflyte-map-execute --inputs s3://metadata/metadata/propeller/flytesnacks-development-ac6x425ghlsgbkl7gcg5/n2/data/inputs.pb --output-prefix s3://metadata/metadata/propeller/flytesnacks-development-ac6x425ghlsgbkl7gcg5/n2/data/0 --raw-output-data-prefix s3://userdata/data/31/ac6x425ghlsgbkl7gcg5-n2-0/1/0 --checkpoint-path s3://userdata/data/31/ac6x425ghlsgbkl7gcg5-n2-0/1/0/_flytecheckpoints --prev-checkpoint "" --resolver MapTaskResolver -- vars model,tokenizer resolver flytekit.core.python_auto_container.default_task_resolver task-module my_workflow.train_model_workflow task-name validate_model

Expected behavior

The map task should not fail.

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • [X] Yes

Have you read the Code of Conduct?

  • [X] Yes

debajyoti-truefoundry avatar Feb 07 '24 12:02 debajyoti-truefoundry

Thank you for opening your first issue here! 🛠

welcome[bot] avatar Feb 07 '24 12:02 welcome[bot]

🤖

Hey @debajyoti-truefoundry, great to see you back! Hope you're doing well.

Based on the information you've provided, it seems like the issue might be related to the way your modules are being imported or located in the pyflyte-map-execute mode. The pyflyte-map-execute mode should not change the way Python handles imports, so it's possible that the issue might be related to the file structure or the way the modules are being referenced.

The error message ModuleNotFoundError: No module named 'my_workflow.my_module' suggests that Python is unable to find the module my_module in the my_workflow package. This could be due to a number of reasons:

  1. The my_workflow package or the my_module module does not exist in the location where Python is looking for it.
  2. The my_workflow package is not properly installed or is not in the Python path.
  3. The my_workflow package does not contain an __init__.py file, which is required for Python to recognize it as a package.

Here are a few things you could try:

  • Check the file structure and make sure that the my_workflow package and the my_module module exist in the expected locations.
  • Make sure that the my_workflow package is properly installed and is in the Python path. You can check the Python path by running import sys; print(sys.path) in your Python environment.
  • Check if the my_workflow package contains an __init__.py file. If it doesn't, you can create one (it can be an empty file).

If none of these suggestions solve the issue, it would be helpful if you could provide more information about your file structure and how you're importing the my_workflow package and the my_module module. This would help in identifying the cause of the issue.

For more information on how Python imports work, you can refer to the Python documentation on modules.

I hope this helps! If you have any more questions or need further clarification, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

dosubot[bot] avatar Feb 07 '24 12:02 dosubot[bot]

I did not face this problem when I registered the workflow and ran from UI.

pyflyte --pkgs my_workflow package -f --source .

flytectl register files \
    --project flytesnacks \
    --domain development \
    --archive flyte-package.tgz \
    --version "$(git rev-parse HEAD)"

image

This is the command that was used at that time in the Pod.

pyflyte-map-execute --inputs s3://metadata/metadata/propeller/flytesnacks-development-abkhjcvsfw42kvttmzkv/n2/data/inputs.pb --output-prefix s3://metadata/metadata/propeller/flytesnacks-development-abkhjcvsfw42kvttmzkv/n2/data/0 --raw-output-data-prefix s3://userdata/data/q1/abkhjcvsfw42kvttmzkv-n2-0/1/0 --checkpoint-path s3://userdata/data/q1/abkhjcvsfw42kvttmzkv-n2-0/1/0/_flytecheckpoints --prev-checkpoint "" --resolver MapTaskResolver -- vars model,tokenizer resolver flytekit.core.python_auto_container.default_task_resolver task-module my_workflow.train_model_workflow task-name validate_model

debajyoti-truefoundry avatar Feb 07 '24 13:02 debajyoti-truefoundry