flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] Failed to run a map task that fans out to 10000 tasks

Open pingsutw opened this issue 3 years ago • 1 comments

Describe the bug

  • k8s-array plugin

The below example works if the input is 1000 Sample code:

from typing import List
from flytekit import Resources, map_task, task, workflow
​
​
@task(cache_version="1.0", cache=True)
def a_mappable_task(a: int) -> str:
    inc = a + 2
    stringified = str(inc)
    return stringified
​
​
@task(cache_version="1.0", cache=True)
def coalesce(b: List[str]) -> str:
    coalesced = "".join(b)
    return coalesced
​
​
@task(cache_version="1.0", cache=True)
def g_l(n: int) -> List[int]:
    res = []
    for i in range(n):
        res.append(i)
    return res
​
​
@workflow
def wf(n: int = 10000) -> str:
    l = g_l(n=n)
    mapped_out = map_task(a_mappable_task)(a=l)
    coalesced = coalesce(b=mapped_out)
    return coalesced
​
​
if __name__ == "__main__":
    result = wf()

Error:

Workflow[flytesnacks:development:development.workflow.map_task.wf] failed. RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 741 [running]:
runtime/debug.Stack()
	/usr/local/go/src/runtime/debug/stack.go:24 +0x65
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:375 +0xfe
panic({0x1f45600, 0x3959500})
	/usr/local/go/src/runtime/panic.go:838 +0x207
github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)
	/go/pkg/mod/github.com/flyteorg/[email protected]/bitarray/bitset.go:33
github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x279ca70, 0xc006688a50}, {0x27a8360?, 0xc00154d760?}, 0xc005fd3440, 0x23d7cd8)
	/go/pkg/mod/github.com/flyteorg/[email protected]/go/tasks/plugins/array/core/metadata.go:33 +0x1e1
github.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7f33215a0ff0, 0xc000a7e380}, {{0x278fa10, 0xc00174a0b0}}, {{0x278fa10, 0xc00174a160}}}, {0x279ca70, 0xc006688a50}, {0x27a8360, 0xc00154d760})
	/go/pkg/mod/github.com/flyteorg/[email protected]/go/tasks/plugins/array/k8s/executor.go:94 +0x225
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x0?, {0x279ca70, 0xc006688810}, {0x279ef58?, 0xc000bfe240?}, 0x0?)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:382 +0x178
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x279cdb8, 0xc000a5d8d8}, {0x278a5a8, 0xc0009b4aa0}, 0xc0009c7260, 0xc0009c7290, 0xc0009c72c0, {0x279e4d8, 0xc0007f8000}, 0xc0009fc000, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:384 +0x9a
github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x279cdb8, 0xc000a5d8d8}, {0x278a5a8, 0xc0009b4aa0}, 0xc0009c7260, 0xc0009c7290, 0xc0009c72c0, {0x279e4d8, 0xc0007f8000}, 0xc0009fc000, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:616 +0x182b
github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x279fe08, 0xc0009f00d0}, {{0xc000c86160, {{...}, 0x0}, {0xc000484080, 0x4, 0x4}}, {0xc000c86180, {{...}, ...}, ...}, ...}, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:70 +0xd8
github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x279fe08, 0xc0009f00d0}, {{0xc000c86160, {{...}, 0x0}, {0xc000484080, 0x4, 0x4}}, {0xc000c86180, {{...}, ...}, ...}, ...}, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:220 +0x9d0
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc0009e0000, {0x279ca70, 0xc006688330}, {0x279e358, 0xc000734000}, 0xc00808dec0, {0x27b0d58?, 0xc008983110?})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:382 +0x157
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc0009e0000, {0x279ca70, 0xc006688330}, 0xc00808dec0, {0x279e358?, 0xc000734000?})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:512 +0x227
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc0009e0000, {0x279ca70, 0xc006688330}, {0x2782f10, 0xc005f9cf00}, 0xc00808dec0, {0x279e358?, 0xc000734000})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:736 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0009e0000, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00}, {0x2782f38?, 0xc005f9cf00?}, {0x27a9470, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:934 +0x705
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdce6?, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00?}, {0x2782f38?, 0xc005f9cf00}, {0x27a9470, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0009e0000, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00}, {0x2782f38?, 0xc005f9cf00?}, {0x27a9470, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdce6?, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00?}, {0x2782f38?, 0xc005f9cf00}, {0x27a9470, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0009e0000, {0x279ca70, 0xc00682def0}, {0x27ac048, 0xc006fde640}, {0x2782f10, 0xc005f9cf00}, {0x2782f38?, 0xc005f9cf00?}, {0x27a9470, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc000a22700, {0x279ca70, 0xc00682def0}, 0xc005f9cf00)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:147 +0x1b3
github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc000a22700, {0x279ca70, 0xc00682def0}, 0xc005f9cf00)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:393 +0x40f
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc0009ff6b0, {0x279ca70, 0xc00682def0}, 0xc0076db848, 0x1e5a060?)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:130 +0x18e
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc0009ff6b0, {0x279ca70, 0xc00682dad0}, 0xc0070ed400)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:131 +0x459
github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc0009ff6b0, {0x279ca70, 0xc00682dad0}, {0xc005f09d40, 0x17}, {0xc005f09d58, 0x14})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:205 +0x86d
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc0009e83f0, 0xc0076dbf28, {0x1e5a060?, 0xc007636360})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:88 +0x510
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc0009e83f0, {0x279ca70, 0xc00682dad0})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:99 +0xf1
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x279ca70?, {0x279ca70, 0xc00096cf30})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:115 +0xbd
github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:150 +0x59
created by github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:147 +0x285
]

Expected behavior

Support large fan-out tasks (10000+)

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • [X] Yes

Have you read the Code of Conduct?

  • [X] Yes

pingsutw avatar Aug 05 '22 01:08 pingsutw

This should have failed because of limits - cc @hamersaw

kumare3 avatar Aug 05 '22 04:08 kumare3

@pingsutw would it be helpful if we created a broader load testing suite? I'm thinking something like the k8s SIG (https://github.com/kubernetes/community/blob/master/sig-scalability/README.md) does for each release.

My company needs something like this in the short term to explore Flyte <> cluster autoscaling interactions at very large scale, but I'm thinking the same thing (if folded into flyte snacks etc.) could give some easy starter workflows to occasionally test flyte propeller etc. limits.

We have. jira ticket up in our internal jira to start building this out, we can PR the load testing workflows into flyte snacks? We'll be stealing this one for map tasks (and doing a similar one for dynamics with varying inputs types etc.)

CalvinLeather avatar Aug 22 '22 15:08 CalvinLeather