spline-spark-agent icon indicating copy to clipboard operation
spline-spark-agent copied to clipboard

Add Databricks environment details to execution plan

Open ganeshnikumbh opened this issue 4 years ago • 10 comments

@wajda @cerveada , after taking some inspiration from Openlineage installtion steps, I was able to do codeless spline installation on Databricks (please see attached document for detailed steps). What I am missing now is to get some additional details from Databricks cluster like notebook name, user etc.. is it possible to build this functionality in spline jar , so we don't need run extra code snippet?

This has been implemented in openlineage using a databricks environment facet builder. I hope this can be done in spline as well.

Install_Spline_Codeless.docx

Shell_Scripts.zip

ganeshnikumbh avatar Mar 01 '22 13:03 ganeshnikumbh

Thank you very much @ganeshnikumbh, that is awesome! We'll review it and include into the related how-to documentation.. Or if you want to blog about it somewhere on Medium or other resource, we'll reference your blog in our readme.

wajda avatar Mar 02 '22 13:03 wajda

@wajda I was kind of hoping you will comment on building the feature for adding Databricks environment details into spline spark agent code :) .. till now I was using below code to add this extra metadata. But I am sure this can built right into the main code. Any thoughts?

%scala
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

val splineConf: Configuration = StandardSplineConfigurationStack(spark)


spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
  //override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
  //val test = dbutils.notebook.getContext.notebookPath
  val notebookInformationJson = dbutils.notebook.getContext.toJson
  val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
  val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

  val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
  val notebookPath = extraContextMap("notebook_path").split("/")
  
  val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
  val user = tagMap("user")
  val name = notebookPath(notebookPath.size-1)
  
  val notebookInfo = Map("notebookURL" -> notebookURL,  
                "user" -> user, 
                "name" -> name,
                "mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
                "timestamp" -> System.currentTimeMillis)
  val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
  
  override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
    override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
    override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
    override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
    override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
    override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
  })
})

ganeshnikumbh avatar Mar 02 '22 13:03 ganeshnikumbh

I was kind of hoping you will comment on building the feature for adding Databricks environment details

Sorry, cannot comment on everything :) We don't have any need for such a plugin at the moment, but will be happy to accept a pull-request. We have a somewhat similar feature - MetadataCollectingFilter - it doesn't support Databricks environment specifically, but could be enhanced to support the use-case. It's in the develop branch at the moment. The usage example can be seen in the test case.

wajda avatar Mar 02 '22 15:03 wajda

Hi @ganeshnikumbh Did you succeed to get the notebook name in the respose of the cosumer api? if yes- i would be happy to know how

zacayd avatar Dec 15 '22 13:12 zacayd

Hi i added the following code as new cell on databricks .but each time i get an error the spline agent- is za.co.absa.spline.agent.spark:spark-3.1-spline-agent-bundle_2.12:0.7.11

can you advise?

command-571735188155188:2: error: object extra is not a member of package za.co.absa.spline.harvester
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
                                   ^
command-571735188155188:3: error: object extra is not a member of package za.co.absa.spline.harvester
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
                                   ^
command-571735188155188:7: error: object DefaultSplineConfigurer is not a member of package za.co.absa.spline.harvester.conf
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
       ^
command-571735188155188:8: error: object v1_1 is not a member of package za.co.absa.spline.producer.model
import za.co.absa.spline.producer.model.v1_1._
                                        ^
command-571735188155188:14: error: za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack.type does not take parameters
val splineConf: Configuration = StandardSplineConfigurationStack(spark)
                                                                ^
command-571735188155188:17: error: not found: type DefaultSplineConfigurer
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
                                ^
command-571735188155188:42: error: not found: type UserExtraMetadataProvider
  override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
                                                                ^
command-571735188155188:42: error: not found: type UserExtraMetadataProvider
  override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
                                                                                                      ^
command-571735188155188:17: error: type mismatch;
 found   : <error>{}
 required: za.co.absa.spline.agent.AgentConfig
spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
                            ^
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.extra.UserExtraMetadataProvider
import za.co.absa.spline.harvester.extra.UserExtraAppendingPostProcessingFilter
import za.co.absa.spline.harvester.HarvestingContext
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.producer.model.v1_1._
import za.co.absa.spline.producer.model._
import scala.util.parsing.json.JSON
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

val splineConf: Configuration = StandardSplineConfigurationStack(spark)


spark.enableLineageTracking(new DefaultSplineConfigurer(spark,splineConf) {
  //override protected def userExtraMetadataProvider = new UserExtraMetaDataProvider {
  //val test = dbutils.notebook.getContext.notebookPath
  val notebookInformationJson = dbutils.notebook.getContext.toJson
  val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
  val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]

  val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
  val notebookPath = extraContextMap("notebook_path").split("/")
  
  val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
  val user = tagMap("user")
  val name = notebookPath(notebookPath.size-1)
  
  val notebookInfo = Map("notebookURL" -> notebookURL,  
                "user" -> user, 
                "name" -> name,
                "mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
                "timestamp" -> System.currentTimeMillis)
  val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
  
  override protected def maybeUserExtraMetadataProvider: Option[UserExtraMetadataProvider] = Some(new UserExtraMetadataProvider() {
    override def forExecEvent(event: ExecutionEvent, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar1")
    override def forExecPlan(plan: ExecutionPlan, ctx: HarvestingContext): Map[String, Any] = Map("notebookInfo" -> notebookInfoJson)
    override def forOperation(op: ReadOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar3")
    override def forOperation(op: WriteOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar4")
    override def forOperation(op: DataOperation, ctx: HarvestingContext): Map[String, Any] = Map("foo" -> "bar5")
  })
})

zacayd avatar Jan 25 '23 09:01 zacayd

HI @ganeshnikumbh can you Share code for getting the notebook name and path in scala that will be written to the spline that fits to 1.0 and higher since the code you wrote is no longer valid Databricks has removed the libraries

zacayd avatar Mar 01 '23 11:03 zacayd

@zacayd we are not using the scala code anymore as it causes changes to the notebooks. As mentioned above we are doing codeless installation and using the default lineage output.

ganeshnikumbh avatar Mar 01 '23 17:03 ganeshnikumbh

so how can you get the name of the notebook or you are have no use of it?

zacayd avatar Mar 01 '23 17:03 zacayd

Hi @ganeshnikumbh did you succeed to create a codless solution that can inject the info about the Databricks infor like name of notebook, url of the workspace to the execution plan if yes-can you please share it with us?

zacayd avatar Jan 12 '24 09:01 zacayd

Hi @ganeshnikumbh

did you succeed to create a codless solution that can inject the info about the Databricks infor like name of notebook,

url of the workspace to the execution plan

if yes-can you please share it with us?

Sorry I was not successful. I tried but it is not working.

ganeshnikumbh avatar Jan 12 '24 10:01 ganeshnikumbh