flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

[wip] generic spark task

Open kumare3 opened this issue 4 years ago • 4 comments

Signed-off-by: Ketan Umare [email protected]

TL;DR

Please replace this text with a description of what this PR accomplishes.

Type

  • [ ] Bug Fix
  • [ ] Feature
  • [ ] Plugin

Are all requirements met?

  • [ ] Code completed
  • [ ] Smoke tested
  • [ ] Unit tests added
  • [ ] Code documentation added
  • [ ] Any pending items have an associated Issue

Complete description

How did you fix the bug, make the feature etc. Link to any design docs etc

Tracking Issue

https://github.com/lyft/flyte/issues/

Follow-up issue

NA OR https://github.com/lyft/flyte/issues/

kumare3 avatar Dec 08 '21 00:12 kumare3

Codecov Report

Merging #767 (880f6a1) into master (df6a949) will decrease coverage by 0.00%. The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #767      +/-   ##
==========================================
- Coverage   85.80%   85.80%   -0.01%     
==========================================
  Files         343      343              
  Lines       29428    29440      +12     
  Branches     2427     2428       +1     
==========================================
+ Hits        25251    25261      +10     
- Misses       3531     3533       +2     
  Partials      646      646              
Impacted Files Coverage Δ
tests/flytekit/unit/core/test_map_task.py 94.00% <0.00%> (-1.56%) :arrow_down:
flytekit/core/task.py 80.48% <0.00%> (ø)
flytekit/core/map_task.py 81.63% <0.00%> (+0.38%) :arrow_up:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update df6a949...880f6a1. Read the comment docs.

codecov[bot] avatar Dec 08 '21 01:12 codecov[bot]

@kumare3 I'd like to try and get this PR updated to be able to support launching Scala/Java jobs in a Databricks cluster. I think I could derive from the skeleton here but I'm not too sure about a couple of details since I've just started using Flyte this week!

I see there's an abstraction for a SparkJob which could be used. The spark_type will need to be set correctly. What's not so clear is:

  1. How is the SparkJob class used to execute a command -- i.e. where in flyte does the contents of this class get converted into a command
  2. How are the Databricks parameters used to construct the API request?

Any help on these questions will help a lot for me to contribute generic Spark capabilities to Flyte.

kochhar-mw avatar May 14 '24 10:05 kochhar-mw

@kochhar-mw firstly welcome and thank you for offering to contribute.

How are the Databricks parameters used to construct the API request? For the databricks api call - refer to databricks Agent, this is how config is marshalled to the back and to the agent

This is where spark context gets created - https://github.com/flyteorg/flytekit/blob/1ca3d91876002534f656688c327f7411ad3f1f89/plugins/flytekit-spark/flytekitplugins/spark/task.py#L69-L98

All the cli handling is done in the base - https://github.com/flyteorg/flytekit/blob/1ca3d91876002534f656688c327f7411ad3f1f89/flytekit/core/python_auto_container.py#L132

And the magic of spark is done through the container image and https://github.com/flyteorg/flytekit/blob/1ca3d91876002534f656688c327f7411ad3f1f89/plugins/flytekit-spark/flytekitplugins/spark/task.py#L152

kumare3 avatar May 15 '24 03:05 kumare3

Thanks @kumare3 for the welcome and for sharing helpful references!

I was reading through the databricks agent (which runs in the backend I believe?) and found that part where the SparkJob protobuf is converted into a Databricks API request.

  1. First, the agent constructs an API call with a spark_python_task key hard-coded into the payload (same in the go-plugin). I believe the agent will need a change to set right key.

  2. Unclear to me if a spark context needs to be created before launching a Java/Scala job.

The [main class] must use SparkContext.getOrCreate to obtain a Spark context; otherwise, runs of the job fail. Databricks docs: spark_jar_task.main_class_name

  1. Finally, for command-line arguments to the main class, they could be in the databricksConf object in the task template (as spark_jar_task.parameters) or in the inputs parameter of the DatabricksAgent.create call. On this I have a noobie question. To allow values from prior steps in a workflow and inserting into a template does flyte typically use inputs?

Thanks for your guidance on this!

kochhar-mw avatar May 15 '24 05:05 kochhar-mw