Add Node Selector attribute to KF plugins
Why are the changes needed?
Currently for flyteplugins-kftensorflow (similarly for pytorch and mpi) there are use cases where one would want to use preemptible/interuptible nodes for workers and chief but not for parameter servers. This is currently not possible as we cannot specify node selectors for each replica type. This PR, along with a flytekit PR introduces this functionality. <link to related flytekit PR>
How was this patch tested?
Example run with flyte sandbox
import os
import time
from flytekit import task, workflow, ImageSpec, dynamic, Resources
from flytekitplugins.kftensorflow import TfJob, Chief, Worker, PS
kubeflow_plugin = "git+https://github.com/alykhantejani/flytekit.git@1a0302028868355dd3e9a148f260a58a947a3730#subdirectory=plugins/flytekit-kf-tensorflow"
envd_plugin = "git+https://github.com/alykhantejani/flytekit.git@1a0302028868355dd3e9a148f260a58a947a3730#subdirectory=plugins/flytekit-envd"
kubeflow_idl = "git+https://github.com/alykhantejani/flyte.git@9dcc23813925ccc7ebbdfe9f31eae704141937af#subdirectory=flyteidl"
flytekit = 'git+https://github.com/alykhantejani/flytekit.git@1a0302028868355dd3e9a148f260a58a947a3730'
custom_image = ImageSpec(
packages=[envd_plugin, flytekit, kubeflow_idl, kubeflow_plugin],
apt_packages=["git"],
name="kftensorflow-flyte-plugin",
registry="localhost:30000",
)
@task(
task_config=TfJob(worker=Worker(replicas=1), ps=PS(replicas=1), chief=Chief(replicas=1, node_selectors={"something": "value"})),
container_image=custom_image
)
def launch(name: str) -> None:
print(os.environ)
@workflow
def tf_job_test() -> None:
launch(name="hello world")
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [ ] All new and existing tests passed.
- [x] All commits are signed-off.
Related PRs
flytekit https://github.com/flyteorg/flytekit/pull/2183
Thank you for opening this pull request! 🙌
These tips will help get your PR across the finish line:
- Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
- Sign off your commits (Reference: DCO Guide).
HI, @alykhantejani Can you provide the screenshot of your PR? I will spend time test it, thank you.
This makes a lot of sense, thanks. I will make some changes
On Fri, Feb 9, 2024 at 5:21 PM Haytham Abuelfutuh @.***> wrote:
@.**** commented on this pull request.
In flyteidl/protos/flyteidl/plugins/kubeflow/mpi.proto https://github.com/flyteorg/flyte/pull/4873#discussion_r1484605069:
- // Node selectors for the replica group
- map<string, string> node_selectors = 6;
I think the change should, unfortunately, be more involved than this... In general we try to abstract away k8s (and infrastructure) concepts when users write code. For instance, when you write an @Task you can add @task(interruptible=True) and not have to think about the right node selectors to add... the administrator would configure those at propeller level... This makes the code more portable and separates infra concerns.
The way I would implement this change is:
Add
oneof is_interruptible { bool interruptible = 6; }
This allows you to know if the field is set and if the value it was set to was False... That or create an enum with 3 states (UNSET, TRUE, FALSE) 2. When the pod is being built here: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L270, you can see it checks for the interruptible field from task execution metadata: https://github.com/flyteorg/flyte/blob/55a67f23f5d8e44bc7d70f738b0fafe954e0bf94/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go#L222... so what you want to do is refactor some of these common functions to allow you to call them from here: replicaSpec, err := common.ToReplicaSpec(ctx, taskCtx, kubeflowv1.MPIJobDefaultContainerName) based on the above flag (whether that's set at the worker pod vs launcher pod)
This way the user can just say: @task(config=TFTraining(worker=PodSpec(interruptible=True))) or something and it would use the same settings configured for @task(interruptible=True) just for the worker...
Happy to jump on a call if you want to discuss further...
cc @Future-Outlier https://github.com/Future-Outlier
— Reply to this email directly, view it on GitHub https://github.com/flyteorg/flyte/pull/4873#pullrequestreview-1872872968, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFHYWW2WV3ICQECYSFMYFDYSZLK7AVCNFSM6AAAAABDBPX5VCVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTQNZSHA3TEOJWHA . You are receiving this because you were mentioned.Message ID: @.***>
@alykhantejani
Hi, here is a full guide for you to setup.
setup flyte cluster
- start dev mode by flytectl
flytectl demo start --dev
- Start kubeflow training operator
export KUBECONFIG=$KUBECONFIG:~/.kube/config:~/.flyte/k3s/k3s.yaml
kustomize build "https://github.com/kubeflow/training-operator.git/manifests/overlays/standalone?ref=v1.5.0" | kubectl apply -f -
- adjust config in your
flyte-single-binary-local-dev.yaml
tasks:
task-plugins:
default-for-task-types:
container: container
container_array: k8s-array
sidecar: sidecar
tensorflow: tensorflow
enabled-plugins:
- container
- k8s-array
- sidecar
- tensorflow
- compile flyte binary in flyte repo
cd flyte
make compile
POD_NAMESPACE=flyte flyte start --config ./flyte-single-binary-local-dev.yaml
reference: https://docs.flyte.org/en/latest/deployment/plugins/k8s/index.html#
run in remote environment
- build a custom image (1) get flyteidl's gitsha (in repo flyte) (2) get flytekit's gitsha (in repo flytekit)
here is my example, in previous PR testing. you have to (1) replace the remote git repo url to yours (2) replace different gitsha by yours
FROM python:3.9-slim-buster
USER root
WORKDIR /root
ENV PYTHONPATH /root
RUN apt-get update && apt-get install build-essential -y
RUN apt-get install git -y
RUN pip install -U git+https://github.com/Future-Outlier/flytekit.git@98ddd542a02551a9a9eb122b98004d0d092abbe9#subdirectory=plugins/flytekit-kf-tensorflow
RUN pip install -U git+https://github.com/Future-Outlier/flyte.git@647b8f4eeeab1a65866d19fab13c416ed0e4a07f#subdirectory=flyteidl
- run the task by specify your image
pyflyte run --remote --image <your-kubeflow-flytekit-image> task.py wf
https://github.com/flyteorg/flyte/issues/4167