[BUG] Task to fails to import modules in `pyflyte-map-execute` mode but works in `pyflyte-execute` mode
Describe the bug
~/work/pipelines/.. │I 1 import os ■ Missing module docstring
.git │ 2 from functools import partial
flyte │ 3 from pathlib import Path
installation │ 4 from typing import List, Tuple
my_workflow │ 5
__pycache__ │E 6 import flytekit ■ Unable to import 'flytekit'
my_module │E 7 from flytekit import ( ■ Unable to import 'flytekit'
__pycache__ │ 8 ImageSpec,
__init__.py │ 9 PodTemplate,
my_module.py │ 10 Resources,
__init__.py │ 11 conditional,
register.sh │ 12 map_task,
run.sh │ 13 task,
train_model_workflow│ 14 workflow,
.gitignore │ 15 )
config.yaml │E 16 from flytekit.types.directory import FlyteDirectory ■ Unable to import 'flytekit.types.directory'
requirements.txt │E 17 from kubernetes import client ■ Unable to import 'kubernetes'
│ 18 from my_workflow.my_module.my_module import random
This is what my current folder structure looks like, and on line 18, I have imported a module.
My workflow file looks like this,
import os
from functools import partial
from pathlib import Path
from typing import List, Tuple
import flytekit
from flytekit import (
ImageSpec,
PodTemplate,
Resources,
conditional,
map_task,
task,
workflow,
)
from flytekit.types.directory import FlyteDirectory
from kubernetes import client
from my_workflow.my_module.my_module import random
nvml_image_spec = ImageSpec(
base_image="python:3.9-slim",
packages=["flytekit==1.10.3", "pynvml==11.5.0"],
registry="redacted",
name="redacted-pytorch",
source_root="..",
)
normal_image = ImageSpec(
base_image="python:3.9-slim",
packages=["flytekit==1.10.3"],
registry="redacted",
name="redacted-normal",
source_root="..",
)
cpu_resource = Resources(mem="300Mi", cpu="0.1")
gpu_resource = Resources(mem="900Mi", cpu="0.1", gpu="1")
gpu_pod_template = PodTemplate(
pod_spec=client.V1PodSpec(
containers=[],
affinity=client.V1Affinity(
node_affinity=client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
node_selector_terms=[
client.V1NodeSelectorTerm(
match_expressions=[
client.V1NodeSelectorRequirement(
key="karpenter.k8s.aws/instance-family",
operator="In",
values=["g4dn"],
)
]
)
]
)
)
),
)
)
@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def should_train_tokenizer(tokenizer: str) -> bool:
return not bool(tokenizer)
@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def train_tokenizer() -> str:
return "trained_tokenizer"
@task(
container_image=nvml_image_spec,
requests=gpu_resource,
limits=gpu_resource,
pod_template=gpu_pod_template,
# accelerator=flytekit.extras.accelerators.T4,
)
def train_model(tokenizer: str) -> FlyteDirectory:
from pynvml import nvmlInit, nvmlDeviceGetCount
nvmlInit()
assert nvmlDeviceGetCount() > 0
random.random()
working_dir = flytekit.current_context().working_directory
local_dir = Path(os.path.join(working_dir, "csv_files"))
local_dir.mkdir(exist_ok=True)
with open(os.path.join(local_dir, "model"), "w", encoding="utf-8") as f:
f.write(tokenizer)
return FlyteDirectory(path=str(local_dir))
@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def validate_model(model: FlyteDirectory, tokenizer: str, validation_data: str) -> bool:
print(validation_data)
model_path = os.path.join(model, "model")
with open(model_path, "r", encoding="utf-8") as f:
return f.read() == tokenizer
@task(requests=cpu_resource, limits=cpu_resource, container_image=normal_image)
def all_good(validations: List[bool]) -> bool:
return all(validations)
@workflow
def train_tokenizer_and_model() -> Tuple[FlyteDirectory, str]:
tokenizer = train_tokenizer()
model = train_model(tokenizer=tokenizer)
return model, tokenizer
@workflow
def just_train_model(tokenizer: str) -> Tuple[FlyteDirectory, str]:
model = train_model(tokenizer=tokenizer)
return model, tokenizer
@workflow
def train(tokenizer: str = "") -> bool:
stt = should_train_tokenizer(tokenizer=tokenizer)
model, t = (
conditional("train_tokenizer")
.if_(stt.is_true())
.then(train_tokenizer_and_model())
.else_()
.then(just_train_model(tokenizer=tokenizer))
)
validation_task = partial(validate_model, model=model, tokenizer=t)
validations = map_task(
validation_task,
concurrency=2,
)(validation_data=["foo", "bar", "baz"])
return all_good(validations=validations)
I used the following command to run the workflow remotely.
pyflyte -v --config ../config.yaml run --remote train_model_workflow.py train --tokenizer Deb
As you can see in the above screenshot, the validate model map task fails. This is the log I can see in the corresponding pod
tar: Removing leading `/' from member names
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/bin/pyflyte-map-execute:8 in <module> │
│ │
│ ❱ 8 │ sys.exit(map_execute_task_cmd()) │
│ │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1157 in __call__ │
│ │
│ ❱ 1157 │ │ return self.main(*args, **kwargs) │
│ │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1078 in main │
│ │
│ ❱ 1078 │ │ │ │ │ rv = self.invoke(ctx) │
│ │
│ /usr/local/lib/python3.9/site-packages/click/core.py:1434 in invoke │
│ │
│ ❱ 1434 │ │ │ return ctx.invoke(self.callback, **ctx.params) │
│ │
│ /usr/local/lib/python3.9/site-packages/click/core.py:783 in invoke │
│ │
│ ❱ 783 │ │ │ │ return __callback(*args, **kwargs) │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:577 in │
│ map_execute_task_cmd │
│ │
│ ❱ 577 │ _execute_map_task( │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:143 in │
│ f │
│ │
│ ❱ 143 │ │ │ return outer_f(inner_f, args, kwargs) │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:173 in │
│ system_entry_point │
│ │
│ ❱ 173 │ │ │ │ return wrapped(*args, **kwargs) │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:418 in │
│ _execute_map_task │
│ │
│ ❱ 418 │ │ map_task = mtr.load_task(loader_args=resolver_args, max_concur │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/utils.py:309 in wrapper │
│ │
│ ❱ 309 │ │ │ │ return func(*args, **kwargs) │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/map_task.py:401 in │
│ load_task │
│ │
│ ❱ 401 │ │ _task_def = resolver_obj.load_task(loader_args=resolver_args) │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/utils.py:309 in wrapper │
│ │
│ ❱ 309 │ │ │ │ return func(*args, **kwargs) │
│ │
│ /usr/local/lib/python3.9/site-packages/flytekit/core/python_auto_container.p │
│ y:248 in load_task │
│ │
│ ❱ 248 │ │ task_module = importlib.import_module(name=task_module) # typ │
│ │
│ /usr/local/lib/python3.9/importlib/__init__.py:127 in import_module │
│ │
│ ❱ 127 │ return _bootstrap._gcd_import(name[level:], package, level) │
│ in _gcd_import:1030 │
│ in _find_and_load:1007 │
│ in _find_and_load_unlocked:986 │
│ in _load_unlocked:680 │
│ in exec_module:850 │
│ in _call_with_frames_removed:228 │
│ │
│ /my_workflow/train_model_workflow.py:18 in <module> │
│ │
│ ❱ 18 from my_workflow.my_module.my_module import random │
╰──────────────────────────────────────────────────────────────────────────────╯
ModuleNotFoundError: No module named 'my_workflow.my_module'
Getting s3://metadata/flytesnacks/development/ZRPWNGFGX2H5F5QHTYL7AZSPOY======/script_mode.tar.gz to ./
I cannot understand why the should_train_tokenizer task was executed successfully, but the map task failed.
command for the failed task,
pyflyte-fast-execute --additional-distribution s3://metadata/flytesnacks/development/ZRPWNGFGX2H5F5QHTYL7AZSPOY======/script_mode.tar.gz --dest-dir . -- pyflyte-map-execute --inputs s3://metadata/metadata/propeller/flytesnacks-development-ac6x425ghlsgbkl7gcg5/n2/data/inputs.pb --output-prefix s3://metadata/metadata/propeller/flytesnacks-development-ac6x425ghlsgbkl7gcg5/n2/data/0 --raw-output-data-prefix s3://userdata/data/31/ac6x425ghlsgbkl7gcg5-n2-0/1/0 --checkpoint-path s3://userdata/data/31/ac6x425ghlsgbkl7gcg5-n2-0/1/0/_flytecheckpoints --prev-checkpoint "" --resolver MapTaskResolver -- vars model,tokenizer resolver flytekit.core.python_auto_container.default_task_resolver task-module my_workflow.train_model_workflow task-name validate_model
Expected behavior
The map task should not fail.
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
Thank you for opening your first issue here! 🛠
🤖
Hey @debajyoti-truefoundry, great to see you back! Hope you're doing well.
Based on the information you've provided, it seems like the issue might be related to the way your modules are being imported or located in the pyflyte-map-execute mode. The pyflyte-map-execute mode should not change the way Python handles imports, so it's possible that the issue might be related to the file structure or the way the modules are being referenced.
The error message ModuleNotFoundError: No module named 'my_workflow.my_module' suggests that Python is unable to find the module my_module in the my_workflow package. This could be due to a number of reasons:
- The
my_workflowpackage or themy_modulemodule does not exist in the location where Python is looking for it. - The
my_workflowpackage is not properly installed or is not in the Python path. - The
my_workflowpackage does not contain an__init__.pyfile, which is required for Python to recognize it as a package.
Here are a few things you could try:
- Check the file structure and make sure that the
my_workflowpackage and themy_modulemodule exist in the expected locations. - Make sure that the
my_workflowpackage is properly installed and is in the Python path. You can check the Python path by runningimport sys; print(sys.path)in your Python environment. - Check if the
my_workflowpackage contains an__init__.pyfile. If it doesn't, you can create one (it can be an empty file).
If none of these suggestions solve the issue, it would be helpful if you could provide more information about your file structure and how you're importing the my_workflow package and the my_module module. This would help in identifying the cause of the issue.
For more information on how Python imports work, you can refer to the Python documentation on modules.
I hope this helps! If you have any more questions or need further clarification, feel free to ask.
Sources
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
I did not face this problem when I registered the workflow and ran from UI.
pyflyte --pkgs my_workflow package -f --source .
flytectl register files \
--project flytesnacks \
--domain development \
--archive flyte-package.tgz \
--version "$(git rev-parse HEAD)"
This is the command that was used at that time in the Pod.
pyflyte-map-execute --inputs s3://metadata/metadata/propeller/flytesnacks-development-abkhjcvsfw42kvttmzkv/n2/data/inputs.pb --output-prefix s3://metadata/metadata/propeller/flytesnacks-development-abkhjcvsfw42kvttmzkv/n2/data/0 --raw-output-data-prefix s3://userdata/data/q1/abkhjcvsfw42kvttmzkv-n2-0/1/0 --checkpoint-path s3://userdata/data/q1/abkhjcvsfw42kvttmzkv-n2-0/1/0/_flytecheckpoints --prev-checkpoint "" --resolver MapTaskResolver -- vars model,tokenizer resolver flytekit.core.python_auto_container.default_task_resolver task-module my_workflow.train_model_workflow task-name validate_model