[BUG] Task config should be used when computing the task version hash
Describe the bug
- Run
pyflyte run --remoteon this file, twice.
from flytekit import task, workflow
from flytekit.core.base_task import PythonTask
from flytekit.core.interface import Interface
import rich_click as click
from dataclasses import dataclass
from typing import Optional
import datetime
@dataclass
class MyConfig(object):
MyField: Optional[str] = None
class MyTask(PythonTask[MyConfig]):
def __init__(
self,
name: str,
my_config: MyConfig,
**kwargs,
):
my_config.MyField = str(datetime.datetime.now())
click.secho(f"Resolved my_config: {my_config}", fg="cyan")
super().__init__(
name=name,
task_type="my_task",
interface=Interface(),
task_config=my_config,
**kwargs,
)
@workflow
def wf2():
job = MyTask(
name="test",
my_config=MyConfig(),
)
return job()
- The output looks similar to below. The version string for each job is identical.
$ pyflyte run --remote wf2.py wf2
Running Execution on Remote.
Resolved job properties: MyConfig(MyField='2024-05-14 22:03:53.646933')
[β] Go to https://.../f6562c3c440124ee3a76 to see execution in the console.
$ pyflyte run --remote wf2.py wf2
Running Execution on Remote.
Resolved my_config: MyConfig(MyField='2024-05-14 22:09:07.348125')
[β] Go to https://.../f72aa991349d041f499b to see execution in the console.
Expected behavior
Task version strings should be different because the task configs are different.
Especially when used in a custom flyte agent that has more complex logic, this will lead to an error like:
RPC Failed, with Status: StatusCode.INVALID_ARGUMENT
details: task with different structure already exists:
- /template/custom/myField: abc -> def
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
Thank you for opening your first issue here! π
The issue you're encountering is due to Flyte not considering changes in task configuration when computing the task version hash. This can lead to errors when the task configuration changes but the version remains the same.
To address this, you can manually specify a version when registering the task to ensure that changes in the configuration are recognized. Here's a modified version of your script that includes a version parameter:
from flytekit import task, workflow
from flytekit.core.base_task import PythonTask
from flytekit.core.interface import Interface
import rich_click as click
from dataclasses import dataclass
from typing import Optional
import datetime
@dataclass
class MyConfig(object):
MyField: Optional[str] = None
class MyTask(PythonTask[MyConfig]):
def __init__(
self,
name: str,
my_config: MyConfig,
version: str,
**kwargs,
):
my_config.MyField = str(datetime.datetime.now())
click.secho(f"Resolved my_config: {my_config}", fg="cyan")
super().__init__(
name=name,
task_type="my_task",
interface=Interface(),
task_config=my_config,
version=version,
**kwargs,
)
@workflow
def wf2():
job = MyTask(
name="test",
my_config=MyConfig(),
version="v1.0.0" # Change this version manually to reflect changes
)
return job()
Would you like more details on how to handle task versioning in Flyte?
Sources:
- Hi I occasionally am getting RPC Failed with Status StatusCode.INVALID_ARGUMENT
- Hi all, I have a script with a few tasks and the workflow def
- Hey, I'm having trouble registering a workflow, it looks like
This is an AI-generated response and your feedback is appreciated! Please leave a π if this is helpful and π if it is not.
Mention @runllm to ask follow-up questions. See runllm.com for more!
@ddl-rliu - this is a controversial topic. Maybe we need to add that as a config? As folks change mem/cpu but often want it to be cached. I do think this might be interesting and in this case, i would simply hash the entire tasktemplate spec
cc @EngHabu / @eapolinario / @wild-endeavor / @cosmicBboy what do you folks think?
Right, that makes sense. Since values like mem/cpu are variable inputs, it's actually desirable for them to not cause changes to the version string. What might be better is, for our case, for us to move these kinds of fields out of the custom field, and into another field that's suited for variable input-type fields. (rather than this idea of using the task config to compute the version hash)
Maybe we need to add that as a config?
Another idea that I thought of is to extend Enable flytekit to be pluggable βΒ and extend the version hash calculation to be pluggable. Then interested parties can write a plugin to add the task config into the version hash logic.