Sansa query (Spark)program fails when integrated with Spark Job Server
Hello Team, I am trying to run one basic sansa query example and I am able to do it. But when I tried to integrate the same with spark job server it is getting failed with the exception as below
org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:820) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:818) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1732) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1651) at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1923) .Below are the things I am doing 1)Extend a class SparkSessionJob and overriding methods in it 2)runJob method has the sansa code in it for querying 3)Build and upload the jar into spark job server(SJS) using an API 4)Trigger spark job using another API
The code snippet I am using are the below .
object SansaQueryExample extends SparkSessionJob {
override type JobData = Seq[String]
override type JobOutput = collection.Map[String, Long]
override def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config):
JobData Or Every[ValidationProblem] = {
Try(config.getString("input.string").split(" ").toSeq)
.map(words => Good(words))
.getOrElse(Bad(One(SingleProblem("No input.string param"))))
}
override def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = {
val input = "hdfs_location_of_input .NT file";
val sparqlQuery: String = "SELECT * WHERE {?s ?p ?o} LIMIT 10"
val lang = Lang.NTRIPLES
val graphRdd = sparkSession.rdf(lang)(input)
println(graphRdd.collect().foreach(println))
val result = graphRdd.sparql(sparqlQuery)
result.write.format("csv").mode("overwrite").save("output_hdfs_location_to_write")
sparkSession.sparkContext.parallelize(data).countByValue
}
I am using sansa 0.7.1 and spark job server 0.8.0 and spark 2.4.4
Can you paste the full exception (if it's incomplete)?