[BUG] Failed to run a map task that fans out to 10000 tasks
Describe the bug
- k8s-array plugin
The below example works if the input is 1000 Sample code:
from typing import List
from flytekit import Resources, map_task, task, workflow
@task(cache_version="1.0", cache=True)
def a_mappable_task(a: int) -> str:
inc = a + 2
stringified = str(inc)
return stringified
@task(cache_version="1.0", cache=True)
def coalesce(b: List[str]) -> str:
coalesced = "".join(b)
return coalesced
@task(cache_version="1.0", cache=True)
def g_l(n: int) -> List[int]:
res = []
for i in range(n):
res.append(i)
return res
@workflow
def wf(n: int = 10000) -> str:
l = g_l(n=n)
mapped_out = map_task(a_mappable_task)(a=l)
coalesced = coalesce(b=mapped_out)
return coalesced
if __name__ == "__main__":
result = wf()
Error:
Workflow[flytesnacks:development:development.workflow.map_task.wf] failed. RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 741 [running]:
runtime/debug.Stack()
/usr/local/go/src/runtime/debug/stack.go:24 +0x65
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:375 +0xfe
panic({0x1f45600, 0x3959500})
/usr/local/go/src/runtime/panic.go:838 +0x207
github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)
/go/pkg/mod/github.com/flyteorg/[email protected]/bitarray/bitset.go:33
github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x279ca70, 0xc006688a50}, {0x27a8360?, 0xc00154d760?}, 0xc005fd3440, 0x23d7cd8)
/go/pkg/mod/github.com/flyteorg/[email protected]/go/tasks/plugins/array/core/metadata.go:33 +0x1e1
github.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7f33215a0ff0, 0xc000a7e380}, {{0x278fa10, 0xc00174a0b0}}, {{0x278fa10, 0xc00174a160}}}, {0x279ca70, 0xc006688a50}, {0x27a8360, 0xc00154d760})
/go/pkg/mod/github.com/flyteorg/[email protected]/go/tasks/plugins/array/k8s/executor.go:94 +0x225
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x0?, {0x279ca70, 0xc006688810}, {0x279ef58?, 0xc000bfe240?}, 0x0?)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:382 +0x178
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x279cdb8, 0xc000a5d8d8}, {0x278a5a8, 0xc0009b4aa0}, 0xc0009c7260, 0xc0009c7290, 0xc0009c72c0, {0x279e4d8, 0xc0007f8000}, 0xc0009fc000, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:384 +0x9a
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x279cdb8, 0xc000a5d8d8}, {0x278a5a8, 0xc0009b4aa0}, 0xc0009c7260, 0xc0009c7290, 0xc0009c72c0, {0x279e4d8, 0xc0007f8000}, 0xc0009fc000, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:616 +0x182b
github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x279fe08, 0xc0009f00d0}, {{0xc000c86160, {{...}, 0x0}, {0xc000484080, 0x4, 0x4}}, {0xc000c86180, {{...}, ...}, ...}, ...}, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:70 +0xd8
github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x279fe08, 0xc0009f00d0}, {{0xc000c86160, {{...}, 0x0}, {0xc000484080, 0x4, 0x4}}, {0xc000c86180, {{...}, ...}, ...}, ...}, ...}, ...)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:220 +0x9d0
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc0009e0000, {0x279ca70, 0xc006688330}, {0x279e358, 0xc000734000}, 0xc00808dec0, {0x27b0d58?, 0xc008983110?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:382 +0x157
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc0009e0000, {0x279ca70, 0xc006688330}, 0xc00808dec0, {0x279e358?, 0xc000734000?})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:512 +0x227
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc0009e0000, {0x279ca70, 0xc006688330}, {0x2782f10, 0xc005f9cf00}, 0xc00808dec0, {0x279e358?, 0xc000734000})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:736 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0009e0000, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00}, {0x2782f38?, 0xc005f9cf00?}, {0x27a9470, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:934 +0x705
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdce6?, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00?}, {0x2782f38?, 0xc005f9cf00}, {0x27a9470, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0009e0000, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00}, {0x2782f38?, 0xc005f9cf00?}, {0x27a9470, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdce6?, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00?}, {0x2782f38?, 0xc005f9cf00}, {0x27a9470, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0009e0000, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00}, {0x2782f38?, 0xc005f9cf00?}, {0x27a9470, ...})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc000a22700, {0x279ca70, 0xc00682def0}, 0xc005f9cf00)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:147 +0x1b3
github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc000a22700, {0x279ca70, 0xc00682def0}, 0xc005f9cf00)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:393 +0x40f
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc0009ff6b0, {0x279ca70, 0xc00682def0}, 0xc0076db848, 0x1e5a060?)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:130 +0x18e
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc0009ff6b0, {0x279ca70, 0xc00682dad0}, 0xc0070ed400)
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:131 +0x459
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc0009ff6b0, {0x279ca70, 0xc00682dad0}, {0xc005f09d40, 0x17}, {0xc005f09d58, 0x14})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:205 +0x86d
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc0009e83f0, 0xc0076dbf28, {0x1e5a060?, 0xc007636360})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:88 +0x510
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc0009e83f0, {0x279ca70, 0xc00682dad0})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:99 +0xf1
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x279ca70?, {0x279ca70, 0xc00096cf30})
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:115 +0xbd
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:150 +0x59
created by github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:147 +0x285
]
Expected behavior
Support large fan-out tasks (10000+)
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
This should have failed because of limits - cc @hamersaw
@pingsutw would it be helpful if we created a broader load testing suite? I'm thinking something like the k8s SIG (https://github.com/kubernetes/community/blob/master/sig-scalability/README.md) does for each release.
My company needs something like this in the short term to explore Flyte <> cluster autoscaling interactions at very large scale, but I'm thinking the same thing (if folded into flyte snacks etc.) could give some easy starter workflows to occasionally test flyte propeller etc. limits.
We have. jira ticket up in our internal jira to start building this out, we can PR the load testing workflows into flyte snacks? We'll be stealing this one for map tasks (and doing a similar one for dynamics with varying inputs types etc.)