metaflow
metaflow copied to clipboard
pin s3 worker count for kubernetes tasks
noting something extra that needs to be handled here:
from metaflow import step, FlowSpec, resources
import os
class S3WorkerFlow(FlowSpec):
@resources(cpu=1)
@step
def start(self):
val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
assert val == 1, "Worker count should be 1!"
self.next(self.bigger)
@resources(cpu=4)
@step
def bigger(self):
val = int(os.environ.get("METAFLOW_S3_WORKER_COUNT"))
assert val == 2, "Worker count should be 2!"
self.next(self.end)
@step
def end(self):
print("Done! 🏁")
if __name__ == "__main__":
S3WorkerFlow()
running this --with kubernetes or argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int
running this
--with kubernetesor argo workflows fails, as resource decorator values are cast to a str-float, which fails to convert back to an int
The cause for this is the different defaults in @batch compared to @kubernetes for the cpu value, which leads to a string-float cpu value with resources&kubernetes, but a string-int cpu value for resources&batch