flyte icon indicating copy to clipboard operation
flyte copied to clipboard

Add Node Selector attribute to KF plugins

Open alykhantejani opened this issue 2 years ago • 5 comments

Why are the changes needed?

Currently for flyteplugins-kftensorflow (similarly for pytorch and mpi) there are use cases where one would want to use preemptible/interuptible nodes for workers and chief but not for parameter servers. This is currently not possible as we cannot specify node selectors for each replica type. This PR, along with a flytekit PR introduces this functionality. <link to related flytekit PR>

How was this patch tested?

Example run with flyte sandbox

import os
import time

from flytekit import task, workflow, ImageSpec, dynamic, Resources

from flytekitplugins.kftensorflow import TfJob, Chief, Worker, PS

kubeflow_plugin = "git+https://github.com/alykhantejani/flytekit.git@1a0302028868355dd3e9a148f260a58a947a3730#subdirectory=plugins/flytekit-kf-tensorflow"
envd_plugin = "git+https://github.com/alykhantejani/flytekit.git@1a0302028868355dd3e9a148f260a58a947a3730#subdirectory=plugins/flytekit-envd"
kubeflow_idl = "git+https://github.com/alykhantejani/flyte.git@9dcc23813925ccc7ebbdfe9f31eae704141937af#subdirectory=flyteidl"
flytekit = 'git+https://github.com/alykhantejani/flytekit.git@1a0302028868355dd3e9a148f260a58a947a3730'

custom_image = ImageSpec(
    packages=[envd_plugin, flytekit, kubeflow_idl, kubeflow_plugin],
    apt_packages=["git"],
    name="kftensorflow-flyte-plugin",
    registry="localhost:30000",
)


@task(
task_config=TfJob(worker=Worker(replicas=1), ps=PS(replicas=1), chief=Chief(replicas=1, node_selectors={"something": "value"})),
container_image=custom_image
)
def launch(name: str) -> None:
    print(os.environ)



@workflow
def tf_job_test() -> None:
    launch(name="hello world")

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [ ] All new and existing tests passed.
  • [x] All commits are signed-off.

Related PRs

flytekit https://github.com/flyteorg/flytekit/pull/2183

alykhantejani avatar Feb 09 '24 13:02 alykhantejani

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

welcome[bot] avatar Feb 09 '24 13:02 welcome[bot]

HI, @alykhantejani Can you provide the screenshot of your PR? I will spend time test it, thank you.

Future-Outlier avatar Feb 09 '24 14:02 Future-Outlier

This makes a lot of sense, thanks. I will make some changes

On Fri, Feb 9, 2024 at 5:21 PM Haytham Abuelfutuh @.***> wrote:

@.**** commented on this pull request.

In flyteidl/protos/flyteidl/plugins/kubeflow/mpi.proto https://github.com/flyteorg/flyte/pull/4873#discussion_r1484605069:

  • // Node selectors for the replica group
  • map<string, string> node_selectors = 6;

I think the change should, unfortunately, be more involved than this... In general we try to abstract away k8s (and infrastructure) concepts when users write code. For instance, when you write an @Task you can add @task(interruptible=True) and not have to think about the right node selectors to add... the administrator would configure those at propeller level... This makes the code more portable and separates infra concerns.

The way I would implement this change is:

  1. Add

    oneof is_interruptible { bool interruptible = 6; }

This allows you to know if the field is set and if the value it was set to was False... That or create an enum with 3 states (UNSET, TRUE, FALSE) 2. When the pod is being built here: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L270, you can see it checks for the interruptible field from task execution metadata: https://github.com/flyteorg/flyte/blob/55a67f23f5d8e44bc7d70f738b0fafe954e0bf94/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go#L222... so what you want to do is refactor some of these common functions to allow you to call them from here: replicaSpec, err := common.ToReplicaSpec(ctx, taskCtx, kubeflowv1.MPIJobDefaultContainerName) based on the above flag (whether that's set at the worker pod vs launcher pod)

This way the user can just say: @task(config=TFTraining(worker=PodSpec(interruptible=True))) or something and it would use the same settings configured for @task(interruptible=True) just for the worker...

Happy to jump on a call if you want to discuss further...

cc @Future-Outlier https://github.com/Future-Outlier

— Reply to this email directly, view it on GitHub https://github.com/flyteorg/flyte/pull/4873#pullrequestreview-1872872968, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFHYWW2WV3ICQECYSFMYFDYSZLK7AVCNFSM6AAAAABDBPX5VCVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTQNZSHA3TEOJWHA . You are receiving this because you were mentioned.Message ID: @.***>

alykhantejani avatar Feb 09 '24 17:02 alykhantejani

@alykhantejani

Hi, here is a full guide for you to setup.

setup flyte cluster

  1. start dev mode by flytectl
flytectl demo start --dev
  1. Start kubeflow training operator
export KUBECONFIG=$KUBECONFIG:~/.kube/config:~/.flyte/k3s/k3s.yaml
kustomize build "https://github.com/kubeflow/training-operator.git/manifests/overlays/standalone?ref=v1.5.0" | kubectl apply -f -
  1. adjust config in your flyte-single-binary-local-dev.yaml
tasks:
  task-plugins:
    default-for-task-types:
      container: container
      container_array: k8s-array
      sidecar: sidecar
      tensorflow: tensorflow
    enabled-plugins:
    - container
    - k8s-array
    - sidecar
    - tensorflow
  1. compile flyte binary in flyte repo
cd flyte
make compile
POD_NAMESPACE=flyte flyte start --config ./flyte-single-binary-local-dev.yaml

reference: https://docs.flyte.org/en/latest/deployment/plugins/k8s/index.html#

run in remote environment

  1. build a custom image (1) get flyteidl's gitsha (in repo flyte) (2) get flytekit's gitsha (in repo flytekit)

here is my example, in previous PR testing. you have to (1) replace the remote git repo url to yours (2) replace different gitsha by yours

FROM python:3.9-slim-buster
USER root
WORKDIR /root
ENV PYTHONPATH /root
RUN apt-get update && apt-get install build-essential -y
RUN apt-get install git -y

RUN pip install -U git+https://github.com/Future-Outlier/flytekit.git@98ddd542a02551a9a9eb122b98004d0d092abbe9#subdirectory=plugins/flytekit-kf-tensorflow

RUN pip install -U git+https://github.com/Future-Outlier/flyte.git@647b8f4eeeab1a65866d19fab13c416ed0e4a07f#subdirectory=flyteidl
  1. run the task by specify your image
pyflyte run --remote --image <your-kubeflow-flytekit-image> task.py wf

Future-Outlier avatar Feb 10 '24 07:02 Future-Outlier

https://github.com/flyteorg/flyte/issues/4167

Future-Outlier avatar Feb 18 '24 06:02 Future-Outlier