spark icon indicating copy to clipboard operation
spark copied to clipboard

[BUG]: UDF Group Apply on Azure DataBricks causes NRE at ArrowColumnVector.getChild

Open jammman opened this issue 5 years ago • 9 comments

Describe the bug Application works locally using spark-submit but once deployed to Azure Databricks it throws a java.lang.NullPointerException at org.apache.spark.sql.vectorized.ArrowColumnVector.getChild(ArrowColumnVector.java:132) exception using Set Jar job.

The code works fine when testing locally using spark-submit. We're guessing the issue is related to dependencies on the workers?

We're pretty sure it's not a result of null values in the DataFrame. Also, the JDBC sql connection works and prints the schema as expected. It only crashes when dataframe.show() calls the UDF.

            DataFrame termsDF = spark.Read()
                .Jdbc(jdbcUrl, "dbo.CountryPopluations", connectionProperties);
            termsDF.PrintSchema();

To Reproduce

Steps to reproduce the behavior:

  1. Deploy .NET Core 3.1 App using Set Jar instructions
  2. Start cluster and job. Code similar to:
            DataFrame birthRatesDF = countriesDF
                .Select("Id",
                        "PopulationCount",
                        "Year",
                        "CountryId")
                .GroupBy("CountryId")
                .Apply(
                    birthratesStructure,
                    r => CalcBirthRates(r,
                        "Id",
                        "PopulationCount",
                        "CountryId"
                    ));

            birthRatesDF.Show(); //Exception thrown here
            birthRatesDF.Write().Mode(SaveMode.Append).Jdbc(jdbcUrl, "dbo.BirthRates", connectionProperties);
            


#if DEBUG
            //// Stop Spark session, but don't call this in prod on Databricks
            spark.Stop();
#endif
        }

        private static RecordBatch CalcBirthRates(RecordBatch salesRecords,
            string idColumnName,
            string populationCountName,
            string countryIdColumnName
            )
        {
            //Do simple math calculations 


            return new RecordBatch(
                new Schema.Builder()
                    .Field(f => f.Name("countryId").DataType(Arrow.Int32Type.Default))
                    .Field(f => f.Name("popuLationGrowth").DataType(Arrow.StringType.Default))
                    .Field(f => f.Name("year").DataType(Arrow.StringType.Default))
                    .Build(),
                    new IArrowArray[]
                    {
                                    countryIds.Build(),
                                    popuLationGrowths.Build(),
                                    years.Build()
                    },
                    recordCount);
        }
  1. See error:
[Times: user=0.59 sys=0.03, real=0.16 secs] 
 [Full GC (Metadata GC Threshold) [PSYoungGen: 253426K->0K(1552384K)] [ParOldGen: 471161K->505759K(4273664K)] 724588K->505759K(5826048K), [Metaspace: 160795K->159628K(1189888K)], 0.8390849 secs] [Times: user=2.80 sys=0.01, real=0.84 secs] 
[...] [...] [Error] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 26 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], )
[..] [...] [Error] [JvmBridge] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.139.64.5, executor 1): java.lang.NullPointerException
	at org.apache.spark.sql.vectorized.ArrowColumnVector.getChild(ArrowColumnVector.java:132)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2$$anonfun$4.apply(FlatMapGroupsInPandasExec.scala:155)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2$$anonfun$4.apply(FlatMapGroupsInPandasExec.scala:155)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2.apply(FlatMapGroupsInPandasExec.scala:155)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2.apply(FlatMapGroupsInPandasExec.scala:152)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
	at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
	at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
	at org.apache.spark.scheduler.Task.run(Task.scala:113)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Expected behavior We expect the program to run successfully as it does locally using spark-submit. The database connection is the same for both environments. And as mentioned before we can confirm it connects.

**Environment: *

  • .NET Core 3.1
  • Azure Databricks Cluster: 6.4 (includes Apache Spark 2.4.5, Scala 2.11)
  • Microsoft.Spark.Worker.netcoreapp3.1.linux-x64-0.10.0
  • microsoft-spark-2.4.x-0.10.0.jar

jammman avatar Mar 31 '20 22:03 jammman

Thanks @jammman for reporting this. Can you run the following and leave the outputs?

countriesDF.Select("Id", "PopulationCount", "Year", "CountryId").printSchema();

birthRatesDF.printSchema();

birthRatesDF.explain(true)

The local Spark version is also 2.4.5 right? We will also try to repro this on our side. cc @elvaliuliuliu

imback82 avatar Mar 31 '20 23:03 imback82

Thanks @imback82 . Local Spark was 2.4.1, upgraded it 2.4.5 and it still works locally. The above was sample code. Here is the real code but it contains some of our real code:

Should also point out we're using

  • mssql-jdbc-8.2.0.jre8.jar

Actual Code:

               .Jdbc(jdbcUrl, "dbo.ViewCatalogFixedTermssTest", connectionProperties);
           termsDF.PrintSchema();

           var ledgerStructure = new StructType(new[]
                   {
                               new StructField("termId", new SqlTypes.IntegerType()),
                               new StructField("ledgerAccountId", new SqlTypes.StringType()),
                               new StructField("sourceAccountId", new SqlTypes.StringType()),
                               new StructField("targetAccountId", new SqlTypes.StringType()),
                               new StructField("transactionTypeId", new SqlTypes.StringType()),
                               new StructField("amount", new SqlTypes.DoubleType()),
                               new StructField("statusTypeId", new SqlTypes.StringType()),
                               new StructField("memo", new SqlTypes.StringType()),
                               new StructField("dateUtc", new SqlTypes.DateType()),
                               new StructField("statusChangeDateUtc", new SqlTypes.DateType())

                   }
           );

           // Grouped Map Vector UDF
           // able to return different shapes and record lengths
           //Initial deposits...

           DataFrame initialDepositsDF = termsDF
               .Select("ItemId",
                       "GrossSales",
                       "ItemAccountId")
               .GroupBy("ItemAccountId")
               .Apply(
                   ledgerStructure,
                   r => CalcInitialDeposits(r,
                       "ItemId",
                       "GrossSales",
                       "ItemAccountId"
                   ));

           termsDF.Select("ItemId",
                       "GrossSales",
                       "ItemAccountId").PrintSchema();

           initialDepositsDF.PrintSchema();

           initialDepositsDF.Explain(true);

           initialDepositsDF.Show();
           initialDepositsDF.Write().Mode(SaveMode.Append).Jdbc(jdbcUrl, "dbo.AccountLedger", connectionProperties);
           
           // Grouped Map Vector UDF
           // able to return different shapes and record lengths
           //Flat rates...
           DataFrame flatRateTransfersDF = termsDF
               .Select("ItemId",
                       "TermsId",
                       "GrossSales",
                       "Rate",
                       "ItemAccountId",
                       "FromAccountId",
                       "ToAccountId")
               .GroupBy("ItemId")
               .Apply(
                   ledgerStructure,
                   r => CalcFlatRateLedger(r,
                       "ItemId",
                       "TermsId",
                       "GrossSales",
                       "Rate",
                       "ItemAccountId",
                       "FromAccountId",
                       "ToAccountId"
                   ));

           flatRateTransfersDF.Show();
           flatRateTransfersDF.Write().Mode(SaveMode.Append).Jdbc(jdbcUrl, "dbo.AccountLedger", connectionProperties);
           //dataFrameItemSales.Show();

#if DEBUG
           //// Stop Spark session, but you can all this in prod on DataBricks
           spark.Stop();
#endif
       }

       private static RecordBatch CalcInitialDeposits(RecordBatch salesRecords,
           string itemIdColumnName,
           string grossSalesColumnName,
           string itemAccountIdColumnName
           )
       {
           var termResults = new InitialDepositTermsCollection();

           try
           {
               var itemIdColumn = salesRecords.Schema.GetFieldIndex(itemIdColumnName);
               var itemId = salesRecords.Column(itemIdColumn) as StringArray;
               var grossSalesColumn = salesRecords.Schema.GetFieldIndex(grossSalesColumnName);
               var grossSales = salesRecords.Column(grossSalesColumn) as DoubleArray;
               var itemAccountIdColumn = salesRecords.Schema.GetFieldIndex(itemAccountIdColumnName);
               var itemAccountId = salesRecords.Column(itemAccountIdColumn) as StringArray;

               //Agg sum for total sales() for this account (based on the grouping)
               for (int i = 0; i < grossSales.Length; i++)
               {
                   termResults.Results(itemAccountId.GetString(i)).Result += grossSales.GetValue(i).GetValueOrDefault();
               }

               if (itemId.Length == 0)
               {
                   return termResults.ToRecordBatch(); //should be zero rows
               }
           }
           catch (Exception ex)
           {

               throw ex;
           }


           return termResults.ToRecordBatch(); //Helper class builds the record batch
       }

Running on Azure Databricks.

[Times: user=0.40 sys=0.00, real=0.11 secs] 
2020-04-01T15:44:13.928+0000: [Full GC (Metadata GC Threshold) [PSYoungGen: 340656K->0K(17
```04448K)] [ParOldGen: 383135K->506580K(4273664K)] 723792K->506580K(5978112K), [Metaspace: 159656K->158489K(1189888K)], 0.7469266 secs] [Times: user=2.40 sys=0.04, real=0.75 secs] 
root
 |-- ItemId: string (nullable = true)
 |-- CatalogId: integer (nullable = true)
 |-- CatalogItemId: string (nullable = true)
 |-- AddedDateTimeUtc: timestamp (nullable = true)
 |-- Name: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- InitialBatchId: string (nullable = true)
 |-- ItemAccountId: string (nullable = true)
 |-- CreatingUserId: string (nullable = true)
 |-- RuleType: string (nullable = true)
 |-- TermsAccountId: string (nullable = true)
 |-- FromAccountId: string (nullable = true)
 |-- ToAccountId: string (nullable = true)
 |-- TermsId: integer (nullable = true)
 |-- Rate: double (nullable = true)
 |-- StartDateUtc: timestamp (nullable = true)
 |-- EndDateUtd: timestamp (nullable = true)
 |-- Enabled: boolean (nullable = true)
 |-- CreatedDateTimeUtc: timestamp (nullable = true)
 |-- TermCatalogId: integer (nullable = true)
 |-- TermItemId: string (nullable = true)
 |-- Id: integer (nullable = true)
 |-- BatchId: string (nullable = true)
 |-- BatchStartDateTimeUtc: date (nullable = true)
 |-- BatchEndDateTimeUtc: date (nullable = true)
 |-- AccountId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- SalesPeriod: date (nullable = true)
 |-- PostedDate: date (nullable = true)
 |-- StoreName: string (nullable = true)
 |-- ChannelCompany: string (nullable = true)
 |-- ChannelName: string (nullable = true)
 |-- Channel: string (nullable = true)
 |-- CountryOfUse: string (nullable = true)
 |-- RegionOfUse: string (nullable = true)
 |-- CityOfUse: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- ProjectType: string (nullable = true)
 |-- ProjectTitle: string (nullable = true)
 |-- ProjectId: string (nullable = true)
 |-- ItemTitle: string (nullable = true)
 |-- PrimaryPublisher: string (nullable = true)
 |-- Label: string (nullable = true)
 |-- UPC: string (nullable = true)
 |-- PrimaryISWC: string (nullable = true)
 |-- PrimaryISRC: string (nullable = true)
 |-- SourceName: string (nullable = true)
 |-- SourceID: string (nullable = true)
 |-- SaleType: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- PricePerUnit: double (nullable = true)
 |-- GrossSales: double (nullable = true)
 |-- ChannelAdminFee: double (nullable = true)
 |-- NetSales: double (nullable = true)
 |-- NetSalesCurrency: string (nullable = true)
 |-- ExchangeRate: double (nullable = true)
 |-- AccountCurrency: string (nullable = true)

root
 |-- ItemId: string (nullable = true)
 |-- GrossSales: double (nullable = true)
 |-- ItemAccountId: string (nullable = true)

root
 |-- termId: integer (nullable = true)
 |-- ledgerAccountId: string (nullable = true)
 |-- sourceAccountId: string (nullable = true)
 |-- targetAccountId: string (nullable = true)
 |-- transactionTypeId: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- statusTypeId: string (nullable = true)
 |-- memo: string (nullable = true)
 |-- dateUtc: date (nullable = true)
 |-- statusChangeDateUtc: date (nullable = true)

== Parsed Logical Plan ==
FlatMapGroupsInPandas [ItemAccountId#11], Apache.Arrow.RecordBatch <Main>b__0_0(Apache.Arrow.RecordBatch)(ItemId#4, GrossSales#55, ItemAccountId#11), [termId#125, ledgerAccountId#126, sourceAccountId#127, targetAccountId#128, transactionTypeId#129, amount#130, statusTypeId#131, memo#132, dateUtc#133, statusChangeDateUtc#134]
+- Project [ItemAccountId#11, ItemId#4, GrossSales#55, ItemAccountId#11]
   +- Project [ItemId#4, GrossSales#55, ItemAccountId#11]
      +- Relation[ItemId#4,CatalogId#5,CatalogItemId#6,AddedDateTimeUtc#7,Name#8,ItemType#9,InitialBatchId#10,ItemAccountId#11,CreatingUserId#12,RuleType#13,TermsAccountId#14,FromAccountId#15,ToAccountId#16,TermsId#17,Rate#18,StartDateUtc#19,EndDateUtd#20,Enabled#21,CreatedDateTimeUtc#22,TermCatalogId#23,TermItemId#24,Id#25,BatchId#26,BatchStartDateTimeUtc#27,... 33 more fields] JDBCRelation(dbo.ViewCatalogFixedTermssTest) [numPartitions=1]

== Analyzed Logical Plan ==
termId: int, ledgerAccountId: string, sourceAccountId: string, targetAccountId: string, transactionTypeId: string, amount: double, statusTypeId: string, memo: string, dateUtc: date, statusChangeDateUtc: date
FlatMapGroupsInPandas [ItemAccountId#11], Apache.Arrow.RecordBatch <Main>b__0_0(Apache.Arrow.RecordBatch)(ItemId#4, GrossSales#55, ItemAccountId#11), [termId#125, ledgerAccountId#126, sourceAccountId#127, targetAccountId#128, transactionTypeId#129, amount#130, statusTypeId#131, memo#132, dateUtc#133, statusChangeDateUtc#134]
+- Project [ItemAccountId#11, ItemId#4, GrossSales#55, ItemAccountId#11]
   +- Project [ItemId#4, GrossSales#55, ItemAccountId#11]
      +- Relation[ItemId#4,CatalogId#5,CatalogItemId#6,AddedDateTimeUtc#7,Name#8,ItemType#9,InitialBatchId#10,ItemAccountId#11,CreatingUserId#12,RuleType#13,TermsAccountId#14,FromAccountId#15,ToAccountId#16,TermsId#17,Rate#18,StartDateUtc#19,EndDateUtd#20,Enabled#21,CreatedDateTimeUtc#22,TermCatalogId#23,TermItemId#24,Id#25,BatchId#26,BatchStartDateTimeUtc#27,... 33 more fields] JDBCRelation(dbo.ViewCatalogFixedTermssTest) [numPartitions=1]

== Optimized Logical Plan ==
FlatMapGroupsInPandas [ItemAccountId#11], Apache.Arrow.RecordBatch <Main>b__0_0(Apache.Arrow.RecordBatch)(ItemId#4, GrossSales#55, ItemAccountId#11), [termId#125, ledgerAccountId#126, sourceAccountId#127, targetAccountId#128, transactionTypeId#129, amount#130, statusTypeId#131, memo#132, dateUtc#133, statusChangeDateUtc#134]
+- Project [ItemAccountId#11, ItemId#4, GrossSales#55, ItemAccountId#11]
   +- Relation[ItemId#4,CatalogId#5,CatalogItemId#6,AddedDateTimeUtc#7,Name#8,ItemType#9,InitialBatchId#10,ItemAccountId#11,CreatingUserId#12,RuleType#13,TermsAccountId#14,FromAccountId#15,ToAccountId#16,TermsId#17,Rate#18,StartDateUtc#19,EndDateUtd#20,Enabled#21,CreatedDateTimeUtc#22,TermCatalogId#23,TermItemId#24,Id#25,BatchId#26,BatchStartDateTimeUtc#27,... 33 more fields] JDBCRelation(dbo.ViewCatalogFixedTermssTest) [numPartitions=1]

== Physical Plan ==
FlatMapGroupsInPandas [ItemAccountId#11], Apache.Arrow.RecordBatch <Main>b__0_0(Apache.Arrow.RecordBatch)(ItemId#4, GrossSales#55, ItemAccountId#11), [termId#125, ledgerAccountId#126, sourceAccountId#127, targetAccountId#128, transactionTypeId#129, amount#130, statusTypeId#131, memo#132, dateUtc#133, statusChangeDateUtc#134]
+- Sort [ItemAccountId#11 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(ItemAccountId#11, 200), [id=#34]
      +- *(1) Project [ItemAccountId#11, ItemId#4, GrossSales#55, ItemAccountId#11]
         +- *(1) Scan JDBCRelation(dbo.ViewCatalogFixedTermssTest) [numPartitions=1] [ItemAccountId#11,ItemId#4,GrossSales#55] PushedFilters: [], ReadSchema: struct<ItemAccountId:string,ItemId:string,GrossSales:double>
[2020-04-01T15:45:12.5422078Z] [0331-134133-lured230-10-139-64-6] [Error] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 26 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], )
[2020-04-01T15:45:12.5423365Z] [0331-134133-lured230-10-139-64-6] [Error] [JvmBridge] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.139.64.5, executor 1): java.lang.NullPointerException
	at org.apache.spark.sql.vectorized.ArrowColumnVector.getChild(ArrowColumnVector.java:132)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2$$anonfun$4.apply(FlatMapGroupsInPandasExec.scala:155)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2$$anonfun$4.apply(FlatMapGroupsInPandasExec.scala:155)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2.apply(FlatMapGroupsInPandasExec.scala:155)
	at org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec$$anonfun$doExecute$2$$anonfun$apply$2.apply(FlatMapGroupsInPandasExec.scala:152)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
	at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
	at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
	at org.apache.spark.scheduler.Task.run(Task.scala:113)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

jammman avatar Apr 01 '20 18:04 jammman

Reviewing the Spark code, it looks like this null exception would only happen if ArrowVector.childColumns is null and since it seems to be initialized in the constructor and only de-initialized in Close() method does that mean Close() is being called on my ArrowVectors before they execute?

It seems ColumnarBatch is closing all of the VectorizedColumns during garbage disposable.

jammman avatar Apr 01 '20 18:04 jammman

I tried a Azure HD Insight Spark cluster to compare and I can confirm this does work correctly on HDInsight with no code changes - as expected (and hoped), so it does seem to be a bug specific to Azure Databricks

jammman avatar Apr 02 '20 03:04 jammman

Oh, thanks @jammman for trying this out. We were planning to experiment the same. :) And it looks like the FlatMapGroupsInPandasExec.scala:155 is different from the OSS Spark, so there must be something specific to Databricks runtime. We will create a ticket to Databricks team and follow up. Btw, did you try this with an earlier version of Spark?

imback82 avatar Apr 02 '20 04:04 imback82

Thanks @jammman for the repro and comparison with HDI! So we will open a ticket with Databricks team and start thread! Will also update the status here! And as @imback82 mentioned, it will also be helpful if you could list the Spark version you tried with local environment vs Databricks vs HDInsight. Thanks!

elvaliuliuliu avatar Apr 02 '20 04:04 elvaliuliuliu

Thanks @elvaliuliuliu and @imback82 , so far have only tried:

  • Local, spark-2.4.1-bin-hadoop2.7, successful
  • Local, spark-2.4.5-bin-hadoop2.7, successful
  • Databricks 6.4 (includes Apache Spark 2.4.5, Scala 2.11), failed
  • HDInsight 2.4, successful

Hope that helps!

jammman avatar Apr 02 '20 06:04 jammman

@imback82 Is there any reason this group apply would fail the same way on a local windows cluster?

I'm running spark-3.0.0-bin-hadoop3.2.

I see the same error described above when running samples: Microsoft.Spark.Examples.Sql.Batch.VectorUdfs and Microsoft.Spark.Examples.Sql.Batch.VectorDataFrameUdfs

I can try to gather more information if you need anything. It sounded like the conclusion everyone had reached was that this was specific to databricks, but it may extend beyond that.

dbeavon avatar Mar 26 '21 02:03 dbeavon

I did a bit more testing.... as-of now I am not getting the error anymore after upgrading to Microsoft.Spark.Worker.1.1.1 and the new version of nuget (microsoft-spark-3-0_2.12-1.1.1.jar)

dbeavon avatar Apr 09 '21 16:04 dbeavon