flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[Core feature] Flytekit should support using output with `Non-Any` type as the input with `Any` type.

Open Mecoli1219 opened this issue 1 year ago • 5 comments

Motivation: Why do you think this is important?

Intuitively, Any type should take every possible type as the input. Flytekit today supports using all Python types as the input, but it doesn't support

  1. the non-Any output from another task.
  2. the non-Any input from the workflow.

Goal: What should the final outcome look like, ideally?

The following scenarios should succeed:

Scenario 1:

@task()
def foo(a: Any) -> int:
    if type(a) == int:
        return a + 1
    return 0

@workflow
def wf(a: int) -> int:
    return foo(a=a)

Scenario 2:

@workflow
def wf1(a: Any) -> int:
    a1 = foo(a=a)
    return foo(a=a1)

Describe alternatives you've considered

Adding some tags as type hints for the transformer?

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • [X] Yes

Have you read the Code of Conduct?

  • [X] Yes

Mecoli1219 avatar May 15 '24 06:05 Mecoli1219

Thank you for opening your first issue here! 🛠

welcome[bot] avatar May 15 '24 06:05 welcome[bot]

To support using output with Non-Any type as the input with Any type in Flytekit, you can leverage custom type transformers. This allows Flytekit to handle various types dynamically. Here's how you can achieve this:

  1. Define a Custom Type Transformer: Create a type transformer that can handle Any type and register it with Flytekit.
  2. Implement the Workflow and Task: Use the custom transformer in your workflow and task definitions.

Here's an example implementation:

from flytekit import task, workflow, TypeEngine, context_manager
from flytekit.types.structured import StructuredDataset
from flytekit.models import types as _type_models
from flytekit.models.literals import Literal, Scalar, Blob, BlobMetadata

class AnyTypeTransformer(TypeEngine.TypeTransformer):
    def __init__(self):
        super().__init__(name="AnyType", t=object)

    def get_literal_type(self, t):
        return _type_models.LiteralType(simple=_type_models.SimpleType.STRUCT)

    def to_literal(self, ctx, python_val, python_type, expected):
        local_file = ctx.file_access.get_random_local_path() + ".json"
        with open(local_file, "w") as f:
            json.dump(python_val, f)
        remote_file = ctx.file_access.get_random_remote_path()
        ctx.file_access.upload(local_file, remote_file)
        return Literal(scalar=Scalar(blob=Blob(uri=remote_file, metadata=BlobMetadata(type=self.get_literal_type(python_type)))))

    def to_python_value(self, ctx, lv, expected_python_type):
        local_file = ctx.file_access.get_random_local_path() + ".json"
        ctx.file_access.download(lv.scalar.blob.uri, local_file)
        with open(local_file, "r") as f:
            return json.load(f)

TypeEngine.register(AnyTypeTransformer())

@task
def foo(a: Any) -> int:
    if isinstance(a, int):
        return a + 1
    return 0

@workflow
def wf(a: int) -> int:
    return foo(a=a)

@workflow
def wf1(a: Any) -> int:
    a1 = foo(a=a)
    return foo(a=a1)

This code defines a custom type transformer for Any type and registers it with Flytekit. The workflows wf and wf1 demonstrate how to use this transformer.

Would you like more details on any specific part?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!

runllm[bot] avatar May 15 '24 06:05 runllm[bot]

Related to https://github.com/flyteorg/flyte/issues/5319.

eapolinario avatar May 16 '24 17:05 eapolinario

This is important to my LangChain Agent PR, so I will take over it.

Future-Outlier avatar May 20 '24 04:05 Future-Outlier

NP

Mecoli1219 avatar May 20 '24 04:05 Mecoli1219