RDD<string>.SaveAsTextFile yields a SerializationException
Context:
I'm trying to write the RDD
My code looks similar to this:
var stream = EventHubsUtils.CreateUnionStream( ssc, eventhubsParams.Select( v => new Tuple<string, string>( v.Key, v.Value ) ) );
DStream<JToken> timestampEntries = stream
.Map( timestamp => Encoding.UTF8.GetString( timestamp ) )
.Map( timestamp => JToken.Parse( timestamp ) )
.Map( jToken =>
string.Join( ",",
jToken["xxx"],
jToken["yyy"],
jToken["zzz"] ) );
timestampEntries.ForeachRDD(
rdd =>
{
rdd.SaveAsTextFile( $"{outputPath}/output" );
});
Exception:
I'm getting a serialization error due to some anonymous function made by the SaveAsTextFile call (I think). The LatencyEntityGenerator+<>c__DisplayClass0_0 below is that anonymous function.
Unhandled Exception:
System.Runtime.Serialization.SerializationException: Type 'Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0' in Assembly 'LatencyEntityGenerator, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable.
at System.Runtime.Serialization.FormatterServices.InternalGetSerializableMembers (System.RuntimeType type) <0x7f71ed020f70 + 0x00401> in <filename unknown>:0
at System.Runtime.Serialization.FormatterServices.GetSerializableMembers (System.Type type, StreamingContext context) <0x7f71ed021770 + 0x001cb> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitMemberInfo () <0x7f71ed010e50 + 0x000e9> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitSerialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010170 + 0x0040a> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.Serialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010100 + 0x00064> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Write (System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo objectInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo memberNameInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo typeNameInfo) <0x7f71ed017cf0 + 0x00277> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Serialize (System.Object graph, System.Runtime.Remoting.Messaging.Header[] inHeaders, System.Runtime.Serialization.Formatters.Binary.__BinaryWriter serWriter, Boolean fCheck) <0x7f71ed016ac0 + 0x005fb> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers, Boolean fCheck) <0x7f71ed00d5d0 + 0x0012e> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers) <0x7f71ed00d5a0 + 0x00021> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph) <0x7f71ed00d580 + 0x00018> in <filename unknown>:0
at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`2 f) <0x4156cf70 + 0x00085> in <filename unknown>:0
at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`1 f) <0x4156cdc0 + 0x00113> in <filename unknown>:0
at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0.<Main>b__0 () <0x41566dc0 + 0x005b3> in <filename unknown>:0
at Microsoft.Spark.CSharp.Streaming.StreamingContext.GetOrCreate (System.String checkpointPath, System.Func`1 creatingFunc) <0x41566950 + 0x00038> in <filename unknown>:0
at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator.Main (System.String[] args) <0x41528d60 + 0x00433> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: System.Runtime.Serialization.SerializationException: Type 'Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0' in Assembly 'LatencyEntityGenerator, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable.
at System.Runtime.Serialization.FormatterServices.InternalGetSerializableMembers (System.RuntimeType type) <0x7f71ed020f70 + 0x00401> in <filename unknown>:0
at System.Runtime.Serialization.FormatterServices.GetSerializableMembers (System.Type type, StreamingContext context) <0x7f71ed021770 + 0x001cb> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitMemberInfo () <0x7f71ed010e50 + 0x000e9> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.InitSerialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010170 + 0x0040a> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo.Serialize (System.Object obj, ISurrogateSelector surrogateSelector, StreamingContext context, System.Runtime.Serialization.Formatters.Binary.SerObjectInfoInit serObjectInfoInit, IFormatterConverter converter, System.Runtime.Serialization.Formatters.Binary.ObjectWriter objectWriter, System.Runtime.Serialization.SerializationBinder binder) <0x7f71ed010100 + 0x00064> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Write (System.Runtime.Serialization.Formatters.Binary.WriteObjectInfo objectInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo memberNameInfo, System.Runtime.Serialization.Formatters.Binary.NameInfo typeNameInfo) <0x7f71ed017cf0 + 0x00277> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.ObjectWriter.Serialize (System.Object graph, System.Runtime.Remoting.Messaging.Header[] inHeaders, System.Runtime.Serialization.Formatters.Binary.__BinaryWriter serWriter, Boolean fCheck) <0x7f71ed016ac0 + 0x005fb> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers, Boolean fCheck) <0x7f71ed00d5d0 + 0x0012e> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph, System.Runtime.Remoting.Messaging.Header[] headers) <0x7f71ed00d5a0 + 0x00021> in <filename unknown>:0
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Serialize (System.IO.Stream serializationStream, System.Object graph) <0x7f71ed00d580 + 0x00018> in <filename unknown>:0
at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`2 f) <0x4156cf70 + 0x00085> in <filename unknown>:0
at Microsoft.Spark.CSharp.Streaming.DStream`1[T].ForeachRDD (System.Action`1 f) <0x4156cdc0 + 0x00113> in <filename unknown>:0
at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator+<>c__DisplayClass0_0.<Main>b__0 () <0x41566dc0 + 0x005b3> in <filename unknown>:0
at Microsoft.Spark.CSharp.Streaming.StreamingContext.GetOrCreate (System.String checkpointPath, System.Func`1 creatingFunc) <0x41566950 + 0x00038> in <filename unknown>:0
at Microsoft.Spark.CSharp.Examples.LatencyEntityGenerator.Main (System.String[] args) <0x41528d60 + 0x00433> in <filename unknown>:0
What is LatencyEntityGenerator? Is it the class that has the main method you are running?
Yes. The class that is being complained about regarding non-serializability is NOT that class though.
The DisplayClass0_0 suffix suggests that this is some anonymous function being called.
Also, just to give the complete picture:
The code will work fine if instead of SaveAsTextFile, we call .Collect() and simply loop over to print the output is console.
This is why I believe the anonymous function is called somewhere inside SaveAsTextFile.
I think what you pass to ForeachRDD is the anonymous method that is not marked serializable in compiler-generated class. Try moving that to a method in a [serializable] class and use that method when calling ForeachRDD. That might fix the issue.
Use PiHelper from examples as a reference.
@skaarthik So I removed all the processing from the Map function (apart from Encoding.UTF8.GetString which is needed to get from byte[] to string) and I'm still getting the same error. Basically, here's my code right now:
var stream = EventHubsUtils.CreateUnionStream( ssc, eventhubsParams.Select( v => new Tuple<string, string>( v.Key, v.Value ) ) );
DStream<string> timestampEntries = stream
.Map( timestamp => Encoding.UTF8.GetString( timestamp ) )
timestampEntries.ForeachRDD(
rdd =>
{
rdd.SaveAsTextFile( $"{outputPath}/output" );
});
If I do this instead of SaveAsTextFile, everything works ok.
timestampEntries.ForeachRDD(
rdd =>
{
foreach ( string timestamp in rdd.Collect() )
{
Console.WriteLine(timestamp);
}
//rdd.SaveAsTextFile( $"{outputPath}/output" );
});
Does this mean that it's either something in SaveAsTextFile or Encoding.UTF8.GetString that's not serializable? I am a little unclear on how to verify that at the moment, but I'll keep looking...
Did you try creating a non-anonymous method to use with Map method and in ForEachRDD methods?
Exact same problem here, I call and serialize like the following as you advised in the other issue according to the Pi example.
I looks like any .net function call in ForeachRDD regardless wrapped in serializable class or not will result in this error. Any idea?
countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
{
//countByLogLevel.SaveAsTextFile(string.Format("{0}/{1}", appOutputPath, Guid.NewGuid()));
foreach (var logCount in countByLogLevel.Collect())
{
new Saver().Save(appOutputPath, logCount);
Console.WriteLine($"detailed log:{logCount}");
}
});
[Serializable]
private class Saver
{
public void Save(string path, string log)
{
//Console.WriteLine(string.Format("{0}\\{1}"));
File.WriteAllText(string.Format("{0}\\{1}", path, Guid.NewGuid()), log);
}
}
I am having same issues. Using mono5 on linux.