metaflow icon indicating copy to clipboard operation
metaflow copied to clipboard

pin s3 worker count for kubernetes tasks

Open savingoyal opened this issue 1 year ago • 2 comments

savingoyal avatar Feb 11 '25 02:02 savingoyal

noting something extra that needs to be handled here:

from metaflow import step, FlowSpec, resources
import os

class S3WorkerFlow(FlowSpec):
    @resources(cpu=1)
    @step
    def start(self):
        val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
        assert val == 1, "Worker count should be 1!"
        self.next(self.bigger)

    @resources(cpu=4)
    @step
    def bigger(self):
        val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
        assert val == 2, "Worker count should be 2!"
        self.next(self.end)

    @step
    def end(self):
        print("Done! 🏁")


if __name__ == "__main__":
    S3WorkerFlow()

running this --with kubernetes or argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int

saikonen avatar Feb 18 '25 18:02 saikonen

running this --with kubernetes or argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int

The cause for this is the different defaults in @batch compared to @kubernetes for the cpu value, which leads to a string-float cpu value with resources&kubernetes, but a string-int cpu value for resources&batch

saikonen avatar Mar 04 '25 10:03 saikonen