scalding icon indicating copy to clipboard operation
scalding copied to clipboard

How to groupBy custom object

Open GeekTemo opened this issue 8 years ago • 2 comments

 When I groupBy custom object it throws Exception.
017-02-16 20:02:43,650 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : cascading.CascadingException: unable to compare stream elements in position: 0
	at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:164)
	at cascading.tuple.hadoop.util.GroupingSortingComparator.compare(GroupingSortingComparator.java:54)
	at cascading.tuple.hadoop.util.ReverseGroupingSortingComparator.compare(ReverseGroupingSortingComparator.java:31)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1269)
	at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:74)
	at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1597)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1486)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:460)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: data_algorithms.ch01.scalding.SecondarySortUsingScalding$DateTemperature.<init>()
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
	at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)
	at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
	at cascading.tuple.hadoop.TupleSerialization$SerializationElementReader.read(TupleSerialization.java:628)
	at cascading.tuple.hadoop.io.HadoopTupleInputStream.readType(HadoopTupleInputStream.java:105)
	at cascading.tuple.hadoop.io.HadoopTupleInputStream.getNextElement(HadoopTupleInputStream.java:52)
	at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:77)
	at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:33)
	at cascading.tuple.hadoop.util.DelegatingTupleElementComparator.compare(DelegatingTupleElementComparator.java:74)
	at cascading.tuple.hadoop.util.DelegatingTupleElementComparator.compare(DelegatingTupleElementComparator.java:34)
	at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:160)
	... 14 more
Caused by: java.lang.NoSuchMethodException: data_algorithms.ch01.scalding.SecondarySortUsingScalding$DateTemperature.<init>()
	at java.lang.Class.getConstructor0(Class.java:3082)
	at java.lang.Class.getDeclaredConstructor(Class.java:2178)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
	... 24 more

2017-02-16 20:02:43,656 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2017-02-16 20:02:43,662 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete hdfs://gongxingfa1:8020/data-algorithms/secondary_sort/sclading/_temporary/1/_temporary/attempt_1486775631291_0013_m_000000_0
2017-02-16 20:02:43,668 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping MapTask metrics system...
2017-02-16 20:02:43,669 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system stopped.
2017-02-16 20:02:43,669 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system shutdown complete.

This is my Code:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.twitter.scalding._


public class DateTemperaturePair implements Writable, WritableComparable<DateTemperaturePair> {
    private final Text yearMonth = new Text();
    private final Text day = new Text();
    private final IntWritable temperature = new IntWritable();

    public DateTemperaturePair() {

    }


    public DateTemperaturePair(String yearMonth, String day, int temperature) {
        this.yearMonth.set(yearMonth);
        this.day.set(day);
        this.temperature.set(temperature);
    }

    @Override
    public int compareTo(DateTemperaturePair pair) {
        int compareValue = yearMonth.compareTo(pair.getYearMonth());
        if (compareValue == 0) {
            compareValue = temperature.compareTo(pair.getTemperature());
        }
        //return compareValue; 
        return -1 * compareValue;
    }

    public static DateTemperaturePair read(DataInput in) throws IOException {
        DateTemperaturePair pair = new DateTemperaturePair();
        pair.readFields(in);
        return pair;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        yearMonth.write(out);
        day.write(out);
        temperature.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        yearMonth.readFields(in);
        day.readFields(in);
        temperature.readFields(in);
    }

    public Text getYearMonth() {
        return yearMonth;
    }

    public Text getDay() {
        return day;
    }

    public IntWritable getTemperature() {
        return temperature;
    }


    public void setYearMonth(String yearMonth) {
        this.yearMonth.set(yearMonth);
    }

    public void setDay(String day) {
        this.day.set(day);
    }

    public void setTemperature(int temperature) {
        this.temperature.set(temperature);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        DateTemperaturePair that = (DateTemperaturePair) o;
        if (temperature != null ? !temperature.equals(that.temperature) : that.temperature != null) {
            return false;
        }
        if (yearMonth != null ? !yearMonth.equals(that.yearMonth) : that.yearMonth != null) {
            return false;
        }

        return true;
    }

    @Override
    public int hashCode() {
        int result = yearMonth != null ? yearMonth.hashCode() : 0;
        result = 31 * result + (temperature != null ? temperature.hashCode() : 0);
        return result;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("DateTemperaturePair{yearMonth=");
        builder.append(yearMonth);
        builder.append(", day=");
        builder.append(day);
        builder.append(", temperature=");
        builder.append(temperature);
        builder.append("}");
        return builder.toString();
    }
}

class SecondarySortUsingScalding(args: Args) extends Job(args) {

  class DateTemperature extends DateTemperaturePair {
    override def toString: String = this.getYearMonth.toString
  }

  class DateTemperatureOrdering extends Ordering[DateTemperature] {
    override def compare(x: DateTemperature, y: DateTemperature): Int = x.compareTo(y)
  }

  val schema = List('year, 'month, 'day, 'temperature)
  val temperatures = Csv(args("input"), fields = schema)
  val pairs = temperatures.mapTo(('year, 'month, 'day, 'temperature) -> ('dateTemperature, 'temperature)) {
    line: (String, String, String, String) =>
      ( {
        val dateTemperature = new DateTemperature()
        dateTemperature.setYearMonth(line._1 + "-" + line._2)
        dateTemperature.setDay(line._3)
        dateTemperature.setTemperature(line._4.toInt)
        dateTemperature
      }, line._4.toInt)
  }
  implicit val groupOrdering = new DateTemperatureOrdering
  val sortedTemperatures = pairs
    .groupBy('dateTemperature) { group => group.sortBy('temperature).reverse.mkString('temperature -> 'temperatures, ",") }
  sortedTemperatures.write(Tsv(args("output")))
}

Data Format: 2000,12,04,10 2000,12,02,-20 2013,01,23,90 2012,12,25,10 2000,11,07,30 2012,12,22,-20 2000,11,24,-40 2012,12,21,30 2012,12,23,60 2012,12,24,70 2013,01,22,80 2013,01,24,70 2013,01,20,-10 2000,11,01,20

GeekTemo avatar Feb 16 '17 12:02 GeekTemo

Note the cause:

Caused by: java.lang.NoSuchMethodException: data_algorithms.ch01.scalding.SecondarySortUsingScalding$DateTemperature.<init>()
  1. This is using the Fields API. We strongly suggest using the TypedAPI which is much safer to use, and better optimized at this point.
  2. you don't need to do the hadoop writable stuff. Scalding does not use it. It uses Kryo for serialization (which can automatically handle most cases).
  3. you shouldn't define inner classes in a scalding job. It is kind of a "gotcha" of scala that it captures the enclosing class, and can break things or give very bad performance. If you move the DateTemperature outside it should work.
  4. I can't see how your ordering is used. Orderings are used in the typed API, but in the fields API you need comparable classes (no ordering is found or used).

johnynek avatar Feb 16 '17 17:02 johnynek

@johnynek It‘s OK.Thanks.

GeekTemo avatar Feb 17 '17 03:02 GeekTemo