[Mobius 1.6.2] Invalid pickle opcode 0 on ToLocalIterator call
Running a basic RDD against Cassandra with Mobius and observed the following exception while using Mobius 1.6.2:
17/03/27 12:10:51 INFO Executor: Finished task 0.0 in stage 1.0 (TID 23). 6600 bytes result sent to driver
17/03/27 12:10:51 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 23) in 695 ms on localhost (1/1)
17/03/27 12:10:51 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:393) finished in 0.695 s
17/03/27 12:10:51 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/03/27 12:10:51 INFO DAGScheduler: Job 0 finished: runJob at PythonRDD.scala:393, took 9.932654 s
Unhandled Exception: Razorvine.Pickle.InvalidOpcodeException: invalid pickle opcode: 0
at Razorvine.Pickle.Unpickler.dispatch(Int16 key)
at Razorvine.Pickle.Unpickler.load(Stream stream)
at Microsoft.Spark.CSharp.Sql.PythonSerDe.GetUnpickledObjects(Byte[] buffer)
at Microsoft.Spark.CSharp.Core.RDDCollector.<Collect>d__0.MoveNext()
at Microsoft.Spark.CSharp.Core.RDD`1.<ToLocalIterator>d__66.MoveNext()
at ElectricalTests.CountDataPoints.Program.Main(String[] args) in D:\clients\quantumscape\quantumscape-etd\spark\samples\ElectricalTests.CountDataPoints\Program.cs:line 78
The offending code:
// Read some raw data points
var rawDataPoints = sqlContext
.Read()
.Format("org.apache.spark.sql.cassandra")
.Options(new Dictionary<string, string>() {{"keyspace", cassandraKs}, {"table", cassandraTable}})
.Load()
.GroupBy("idtest_run", "cycle_id", "pulse_id", "step_id")
.Count();
// convert to RDD, execute group and collect the results
var rddItems = rawDataPoints.ToRDD().ToLocalIterator();
foreach (var i in rddItems)
{
Console.WriteLine(i);
}
Note that this code works if I switch from ToLocalIterator() to Collect() on the RDD.
@Aaronontheweb ToLocalIterator and Collect implementations do not seem to share the code that could be shared (hence the difference in behavior). I do not know the reason for that. I will ask other contributors if they know the reason for this and report back.
Not sure if it's the same issue, so I share it mine with you: I'm also using 1.6.2.
var dataframe = sqlContext.Read().Json(path);
Row[] rows = dataframe.Rdd.Take(10);
[2017-04-02 17:10:26,781] [1] [ERROR] [TestSparkCLR.SparkJob] - invalid pickle opcode: 0
[2017-04-02 17:10:26,783] [1] [ERROR] [TestSparkCLR.SparkJob] -
*******************************************************************************************************************************
at Razorvine.Pickle.Unpickler.dispatch (System.Int16 key) [0x00479] in <0d1ed9b414d44a7c8155c170867d3c13>:0
at Razorvine.Pickle.Unpickler.load (System.IO.Stream stream) [0x0001e] in <0d1ed9b414d44a7c8155c170867d3c13>:0
at Razorvine.Pickle.Unpickler.loads (System.Byte[] pickledata) [0x00007] in <0d1ed9b414d44a7c8155c170867d3c13>:0
at Microsoft.Spark.CSharp.Sql.PythonSerDe.GetUnpickledObjects (System.Byte[] buffer) [0x00005] in <697ee88a70054fac8fc7fb900202ec50>:0
at Microsoft.Spark.CSharp.Core.RDDCollector+<Collect>d__0.MoveNext () [0x00174] in <697ee88a70054fac8fc7fb900202ec50>:0
at System.Linq.Enumerable+<CastIterator>c__Iterator17`1[TResult].MoveNext () [0x00080] in <198d428ffe33413aa3c4cb6d388f57b3>:0
at System.Collections.Generic.List`1[T].InsertRange (System.Int32 index, System.Collections.Generic.IEnumerable`1[T] collection) [0x000ff] in <a09726166c1843dfba9f344410894800>:0
at System.Collections.Generic.List`1[T].AddRange (System.Collections.Generic.IEnumerable`1[T] collection) [0x00000] in <a09726166c1843dfba9f344410894800>:0
at Microsoft.Spark.CSharp.Core.RDD`1[T].Take (System.Int32 num) [0x000b3] in <697ee88a70054fac8fc7fb900202ec50>:0
at TestSparkCLR.SparkJob.Main (System.String[] args) [0x00052] in <7c16fd65c0444e989dec03054b22c87d>:0
*******************************************************************************************************************************
@grozeille - seems like the root cause of both issues are the same. I do not have any additional info on this issue. Will update the issue after investigation.