scalding
scalding copied to clipboard
How to groupBy custom object
When I groupBy custom object it throws Exception.
017-02-16 20:02:43,650 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : cascading.CascadingException: unable to compare stream elements in position: 0
at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:164)
at cascading.tuple.hadoop.util.GroupingSortingComparator.compare(GroupingSortingComparator.java:54)
at cascading.tuple.hadoop.util.ReverseGroupingSortingComparator.compare(ReverseGroupingSortingComparator.java:31)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1269)
at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:74)
at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1597)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1486)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:460)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: data_algorithms.ch01.scalding.SecondarySortUsingScalding$DateTemperature.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
at cascading.tuple.hadoop.TupleSerialization$SerializationElementReader.read(TupleSerialization.java:628)
at cascading.tuple.hadoop.io.HadoopTupleInputStream.readType(HadoopTupleInputStream.java:105)
at cascading.tuple.hadoop.io.HadoopTupleInputStream.getNextElement(HadoopTupleInputStream.java:52)
at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:77)
at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:33)
at cascading.tuple.hadoop.util.DelegatingTupleElementComparator.compare(DelegatingTupleElementComparator.java:74)
at cascading.tuple.hadoop.util.DelegatingTupleElementComparator.compare(DelegatingTupleElementComparator.java:34)
at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:160)
... 14 more
Caused by: java.lang.NoSuchMethodException: data_algorithms.ch01.scalding.SecondarySortUsingScalding$DateTemperature.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
... 24 more
2017-02-16 20:02:43,656 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2017-02-16 20:02:43,662 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete hdfs://gongxingfa1:8020/data-algorithms/secondary_sort/sclading/_temporary/1/_temporary/attempt_1486775631291_0013_m_000000_0
2017-02-16 20:02:43,668 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping MapTask metrics system...
2017-02-16 20:02:43,669 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system stopped.
2017-02-16 20:02:43,669 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system shutdown complete.
This is my Code:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.twitter.scalding._
public class DateTemperaturePair implements Writable, WritableComparable<DateTemperaturePair> {
private final Text yearMonth = new Text();
private final Text day = new Text();
private final IntWritable temperature = new IntWritable();
public DateTemperaturePair() {
}
public DateTemperaturePair(String yearMonth, String day, int temperature) {
this.yearMonth.set(yearMonth);
this.day.set(day);
this.temperature.set(temperature);
}
@Override
public int compareTo(DateTemperaturePair pair) {
int compareValue = yearMonth.compareTo(pair.getYearMonth());
if (compareValue == 0) {
compareValue = temperature.compareTo(pair.getTemperature());
}
//return compareValue;
return -1 * compareValue;
}
public static DateTemperaturePair read(DataInput in) throws IOException {
DateTemperaturePair pair = new DateTemperaturePair();
pair.readFields(in);
return pair;
}
@Override
public void write(DataOutput out) throws IOException {
yearMonth.write(out);
day.write(out);
temperature.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
yearMonth.readFields(in);
day.readFields(in);
temperature.readFields(in);
}
public Text getYearMonth() {
return yearMonth;
}
public Text getDay() {
return day;
}
public IntWritable getTemperature() {
return temperature;
}
public void setYearMonth(String yearMonth) {
this.yearMonth.set(yearMonth);
}
public void setDay(String day) {
this.day.set(day);
}
public void setTemperature(int temperature) {
this.temperature.set(temperature);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DateTemperaturePair that = (DateTemperaturePair) o;
if (temperature != null ? !temperature.equals(that.temperature) : that.temperature != null) {
return false;
}
if (yearMonth != null ? !yearMonth.equals(that.yearMonth) : that.yearMonth != null) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = yearMonth != null ? yearMonth.hashCode() : 0;
result = 31 * result + (temperature != null ? temperature.hashCode() : 0);
return result;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("DateTemperaturePair{yearMonth=");
builder.append(yearMonth);
builder.append(", day=");
builder.append(day);
builder.append(", temperature=");
builder.append(temperature);
builder.append("}");
return builder.toString();
}
}
class SecondarySortUsingScalding(args: Args) extends Job(args) {
class DateTemperature extends DateTemperaturePair {
override def toString: String = this.getYearMonth.toString
}
class DateTemperatureOrdering extends Ordering[DateTemperature] {
override def compare(x: DateTemperature, y: DateTemperature): Int = x.compareTo(y)
}
val schema = List('year, 'month, 'day, 'temperature)
val temperatures = Csv(args("input"), fields = schema)
val pairs = temperatures.mapTo(('year, 'month, 'day, 'temperature) -> ('dateTemperature, 'temperature)) {
line: (String, String, String, String) =>
( {
val dateTemperature = new DateTemperature()
dateTemperature.setYearMonth(line._1 + "-" + line._2)
dateTemperature.setDay(line._3)
dateTemperature.setTemperature(line._4.toInt)
dateTemperature
}, line._4.toInt)
}
implicit val groupOrdering = new DateTemperatureOrdering
val sortedTemperatures = pairs
.groupBy('dateTemperature) { group => group.sortBy('temperature).reverse.mkString('temperature -> 'temperatures, ",") }
sortedTemperatures.write(Tsv(args("output")))
}
Data Format: 2000,12,04,10 2000,12,02,-20 2013,01,23,90 2012,12,25,10 2000,11,07,30 2012,12,22,-20 2000,11,24,-40 2012,12,21,30 2012,12,23,60 2012,12,24,70 2013,01,22,80 2013,01,24,70 2013,01,20,-10 2000,11,01,20
Note the cause:
Caused by: java.lang.NoSuchMethodException: data_algorithms.ch01.scalding.SecondarySortUsingScalding$DateTemperature.<init>()
- This is using the Fields API. We strongly suggest using the TypedAPI which is much safer to use, and better optimized at this point.
- you don't need to do the hadoop writable stuff. Scalding does not use it. It uses Kryo for serialization (which can automatically handle most cases).
- you shouldn't define inner classes in a scalding job. It is kind of a "gotcha" of scala that it captures the enclosing class, and can break things or give very bad performance. If you move the
DateTemperatureoutside it should work. - I can't see how your ordering is used. Orderings are used in the typed API, but in the fields API you need comparable classes (no ordering is found or used).
@johnynek It‘s OK.Thanks.