[wip] generic spark task
Signed-off-by: Ketan Umare [email protected]
TL;DR
Please replace this text with a description of what this PR accomplishes.
Type
- [ ] Bug Fix
- [ ] Feature
- [ ] Plugin
Are all requirements met?
- [ ] Code completed
- [ ] Smoke tested
- [ ] Unit tests added
- [ ] Code documentation added
- [ ] Any pending items have an associated Issue
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
https://github.com/lyft/flyte/issues/
Follow-up issue
NA
OR
https://github.com/lyft/flyte/issues/
Codecov Report
Merging #767 (880f6a1) into master (df6a949) will decrease coverage by
0.00%. The diff coverage isn/a.
@@ Coverage Diff @@
## master #767 +/- ##
==========================================
- Coverage 85.80% 85.80% -0.01%
==========================================
Files 343 343
Lines 29428 29440 +12
Branches 2427 2428 +1
==========================================
+ Hits 25251 25261 +10
- Misses 3531 3533 +2
Partials 646 646
| Impacted Files | Coverage Δ | |
|---|---|---|
| tests/flytekit/unit/core/test_map_task.py | 94.00% <0.00%> (-1.56%) |
:arrow_down: |
| flytekit/core/task.py | 80.48% <0.00%> (ø) |
|
| flytekit/core/map_task.py | 81.63% <0.00%> (+0.38%) |
:arrow_up: |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact),ø = not affected,? = missing dataPowered by Codecov. Last update df6a949...880f6a1. Read the comment docs.
@kumare3 I'd like to try and get this PR updated to be able to support launching Scala/Java jobs in a Databricks cluster. I think I could derive from the skeleton here but I'm not too sure about a couple of details since I've just started using Flyte this week!
I see there's an abstraction for a SparkJob which could be used. The spark_type will need to be set correctly. What's not so clear is:
- How is the
SparkJobclass used to execute a command -- i.e. where in flyte does the contents of this class get converted into a command - How are the Databricks parameters used to construct the API request?
Any help on these questions will help a lot for me to contribute generic Spark capabilities to Flyte.
@kochhar-mw firstly welcome and thank you for offering to contribute.
How are the Databricks parameters used to construct the API request? For the databricks api call - refer to databricks Agent, this is how config is marshalled to the back and to the agent
This is where spark context gets created - https://github.com/flyteorg/flytekit/blob/1ca3d91876002534f656688c327f7411ad3f1f89/plugins/flytekit-spark/flytekitplugins/spark/task.py#L69-L98
All the cli handling is done in the base - https://github.com/flyteorg/flytekit/blob/1ca3d91876002534f656688c327f7411ad3f1f89/flytekit/core/python_auto_container.py#L132
And the magic of spark is done through the container image and https://github.com/flyteorg/flytekit/blob/1ca3d91876002534f656688c327f7411ad3f1f89/plugins/flytekit-spark/flytekitplugins/spark/task.py#L152
Thanks @kumare3 for the welcome and for sharing helpful references!
I was reading through the databricks agent (which runs in the backend I believe?) and found that part where the SparkJob protobuf is converted into a Databricks API request.
-
First, the agent constructs an API call with a
spark_python_taskkey hard-coded into the payload (same in the go-plugin). I believe the agent will need a change to set right key. -
Unclear to me if a spark context needs to be created before launching a Java/Scala job.
The [main class] must use SparkContext.getOrCreate to obtain a Spark context; otherwise, runs of the job fail. Databricks docs:
spark_jar_task.main_class_name
- Finally, for command-line arguments to the main class, they could be in the
databricksConfobject in the task template (asspark_jar_task.parameters) or in theinputsparameter of theDatabricksAgent.createcall. On this I have a noobie question. To allow values from prior steps in a workflow and inserting into a template does flyte typically useinputs?
Thanks for your guidance on this!