Add Databricks environment details to execution plan
@wajda @cerveada , after taking some inspiration from Openlineage installtion steps, I was able to do codeless spline installation on Databricks (please see attached document for detailed steps). What I am missing now is to get some additional details from Databricks cluster like notebook name, user etc.. is it possible to build this functionality in spline jar , so we don't need run extra code snippet?
This has been implemented in openlineage using a databricks environment facet builder. I hope this can be done in spline as well.
Thank you very much @ganeshnikumbh, that is awesome! We'll review it and include into the related how-to documentation.. Or if you want to blog about it somewhere on Medium or other resource, we'll reference your blog in our readme.
@wajda I was kind of hoping you will comment on building the feature for adding Databricks environment details into spline spark agent code :) .. till now I was using below code to add this extra metadata. But I am sure this can built right into the main code. Any thoughts?
%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
//override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
//val test = dbutils.notebook.getContext.notebookPath
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"name" -> name,
"mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
})
})
I was kind of hoping you will comment on building the feature for adding Databricks environment details
Sorry, cannot comment on everything :) We don't have any need for such a plugin at the moment, but will be happy to accept a pull-request. We have a somewhat similar feature - MetadataCollectingFilter - it doesn't support Databricks environment specifically, but could be enhanced to support the use-case. It's in the develop branch at the moment. The usage example can be seen in the test case.
Hi @ganeshnikumbh Did you succeed to get the notebook name in the respose of the cosumer api? if yes- i would be happy to know how
Hi i added the following code as new cell on databricks .but each time i get an error the spline agent- is za.co.absa.spline.agent.spark:spark-3.1-spline-agent-bundle_2.12:0.7.11
can you advise?
command-571735188155188:2: error: object extra is not a member of package za.co.absa.spline.harvester
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
^
command-571735188155188:3: error: object extra is not a member of package za.co.absa.spline.harvester
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
^
command-571735188155188:7: error: object DefaultSplineConfigurer is not a member of package za.co.absa.spline.harvester.conf
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
^
command-571735188155188:8: error: object v1_1 is not a member of package za.co.absa.spline.producer.model
import za.co.absa.spline.producer.model.v1_1._
^
command-571735188155188:14: error: za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack.type does not take parameters
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
^
command-571735188155188:17: error: not found: type DefaultSplineConfigurer
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
^
command-571735188155188:42: error: not found: type UserExtraMetadataProvider
override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
^
command-571735188155188:42: error: not found: type UserExtraMetadataProvider
override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
^
command-571735188155188:17: error: type mismatch;
found : <error>{}
required: za.co.absa.spline.agent.AgentConfig
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
^
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
//override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
//val test = dbutils.notebook.getContext.notebookPath
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"name" -> name,
"mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
})
})
HI @ganeshnikumbh can you Share code for getting the notebook name and path in scala that will be written to the spline that fits to 1.0 and higher since the code you wrote is no longer valid Databricks has removed the libraries
@zacayd we are not using the scala code anymore as it causes changes to the notebooks. As mentioned above we are doing codeless installation and using the default lineage output.
so how can you get the name of the notebook or you are have no use of it?
Hi @ganeshnikumbh did you succeed to create a codless solution that can inject the info about the Databricks infor like name of notebook, url of the workspace to the execution plan if yes-can you please share it with us?
Hi @ganeshnikumbh
did you succeed to create a codless solution that can inject the info about the Databricks infor like name of notebook,
url of the workspace to the execution plan
if yes-can you please share it with us?
Sorry I was not successful. I tried but it is not working.