[fix] Add blob typechecker
Tracking issue
Likely similar issue to https://github.com/flyteorg/flyte/issues/2864
Repro idea: pyflyte run --remote wf.py wf
import csv
from collections import defaultdict
from pathlib import Path
from typing import List, TypeVar, NamedTuple
import flytekit
from flytekit import task, workflow
from flytekit.types.file import FlyteFile
@task(cache=True, cache_version="3.0")
def t1() -> FlyteFile[TypeVar("csv")]:
columns_to_normalize = ["a", "b"]
out_path = Path(flytekit.current_context().working_directory) / f"normalized.csv"
with out_path.open(mode="w") as output_file:
writer = csv.DictWriter(output_file, fieldnames=columns_to_normalize)
writer.writeheader()
print(columns_to_normalize)
return FlyteFile(path=out_path)
@task(cache=True, cache_version="3.0")
def t2(ff: FlyteFile[TypeVar("csv")]) -> FlyteFile[TypeVar("csv")]:
print("hello")
return ff
@workflow
def wf() -> FlyteFile[TypeVar("csv")]:
return t2(ff=t1())
if __name__ == "__main__":
wf()
Why are the changes needed?
An error happens whenever a FlyteFile[TypeVar("csv")] output is passed to another input, with caching enabled.
Errors in flytepropeller when caching is on for FlyteFile type:
{"json":{"exec_id":"legally-working-squirrel-zel6-zrvvdwz7jrrf79cwm44j","node":"n0","ns":"domino-compute","res_ver":"73517980","routine":"worker-1","wf":"6668e7c9d14c4306e8cc54d1:development:workflow.training_workflow_cache"},"level":"error","msg":"DataCatalog failed to get outputs from artifact adbf03db-daea-4108-9f8f-c352ac395a6b, err: unexpected artifactData: [processed_data] type: [blob:{}] does not match any task output type: [blob:{format:\"csv\"}]","ts":"2024-06-26T00:05:39Z"}
What changes were proposed in this pull request?
I believe these errors happen when the trivialChecker is used for blob types. This PR adds a blobtypechecker.
How was this patch tested?
Setup process
Screenshots
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [ ] All new and existing tests passed.
- [ ] All commits are signed-off.