[Core feature] Flytekit should support using output with `Non-Any` type as the input with `Any` type.
Motivation: Why do you think this is important?
Intuitively, Any type should take every possible type as the input. Flytekit today supports using all Python types as the input, but it doesn't support
- the non-Any output from another
task. - the non-Any input from the
workflow.
Goal: What should the final outcome look like, ideally?
The following scenarios should succeed:
Scenario 1:
@task()
def foo(a: Any) -> int:
if type(a) == int:
return a + 1
return 0
@workflow
def wf(a: int) -> int:
return foo(a=a)
Scenario 2:
@workflow
def wf1(a: Any) -> int:
a1 = foo(a=a)
return foo(a=a1)
Describe alternatives you've considered
Adding some tags as type hints for the transformer?
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
Thank you for opening your first issue here! 🛠
To support using output with Non-Any type as the input with Any type in Flytekit, you can leverage custom type transformers. This allows Flytekit to handle various types dynamically. Here's how you can achieve this:
-
Define a Custom Type Transformer: Create a type transformer that can handle
Anytype and register it with Flytekit. - Implement the Workflow and Task: Use the custom transformer in your workflow and task definitions.
Here's an example implementation:
from flytekit import task, workflow, TypeEngine, context_manager
from flytekit.types.structured import StructuredDataset
from flytekit.models import types as _type_models
from flytekit.models.literals import Literal, Scalar, Blob, BlobMetadata
class AnyTypeTransformer(TypeEngine.TypeTransformer):
def __init__(self):
super().__init__(name="AnyType", t=object)
def get_literal_type(self, t):
return _type_models.LiteralType(simple=_type_models.SimpleType.STRUCT)
def to_literal(self, ctx, python_val, python_type, expected):
local_file = ctx.file_access.get_random_local_path() + ".json"
with open(local_file, "w") as f:
json.dump(python_val, f)
remote_file = ctx.file_access.get_random_remote_path()
ctx.file_access.upload(local_file, remote_file)
return Literal(scalar=Scalar(blob=Blob(uri=remote_file, metadata=BlobMetadata(type=self.get_literal_type(python_type)))))
def to_python_value(self, ctx, lv, expected_python_type):
local_file = ctx.file_access.get_random_local_path() + ".json"
ctx.file_access.download(lv.scalar.blob.uri, local_file)
with open(local_file, "r") as f:
return json.load(f)
TypeEngine.register(AnyTypeTransformer())
@task
def foo(a: Any) -> int:
if isinstance(a, int):
return a + 1
return 0
@workflow
def wf(a: int) -> int:
return foo(a=a)
@workflow
def wf1(a: Any) -> int:
a1 = foo(a=a)
return foo(a=a1)
This code defines a custom type transformer for Any type and registers it with Flytekit. The workflows wf and wf1 demonstrate how to use this transformer.
Would you like more details on any specific part?
Sources:
- Is there way to specify a custom type transformer for a JSON
- Is it possible to register a TypeTransformer for one of our
- Hi there what would be involved in being able to pass inform
This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.
Mention @runllm to ask follow-up questions. See runllm.com for more!
Related to https://github.com/flyteorg/flyte/issues/5319.
This is important to my LangChain Agent PR, so I will take over it.
NP