3

I know that SortComparator is used to sort the map output by their keys. I have written a custom SortComparator to understand the MapReduce framework better.This is my WordCount class with custom SortComparator class.

package bananas;

import java.io.FileWriter;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {


  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);

      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();    
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {


      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }

      result.set(sum);
      context.write(key, result);
    }
  }

  public static class MyPartitoner extends Partitioner<Text, IntWritable>{

    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {


        return Math.abs(key.hashCode()) % numPartitions;
    }  
  }

  public static class MySortComparator2 extends WritableComparator{

      protected MySortComparator2() {
          super();
          }

      @SuppressWarnings({ "rawtypes" })
    @Override
      public int compare(WritableComparable w1,WritableComparable w2){

          return 0;
      }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setSortComparatorClass(MySortComparator2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

but when I execute this I am getting this error

Error: java.lang.NullPointerException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:157)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1265)
    at org.apache.hadoop.util.QuickSort.fix(QuickSort.java:35)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:87)
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1593)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:720)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:790)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

My custom SortComparator class looks fine to me. After mapping is done MySortComparator2's compare method should receive "Text" keys as input and since I am returning 0 no sorting will be done. This is what I expected to see/observe. I followed these tutorials

http://codingjunkie.net/secondary-sort/

http://blog.zaloni.com/secondary-sorting-in-hadoop

http://www.bigdataspeak.com/2013/02/hadoop-how-to-do-secondary-sort-on_25.html

Thanks in advance I would appreciate some help.

3 Answers 3

5

Actually, there is a problem with MySortComparator2 constructor. The code should looks like

protected MySortComparator2() {
      super(Text.class, true);
}

where the first parameter is your key class and the second parameter's value ensures WritableComparator is instantiated in a way that WritableComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) can invoke MySortComparator2.compare(WritableComparable a, WritableComparable b)

Sign up to request clarification or add additional context in comments.

Comments

2

You need to implement/override this method, too:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    // per your desired no-sort logic
    return 0;
}

I think that your comparator is being constructed in such a way that the variables mentioned in the super implementation are null (and this is the method that's being called in support of the sort - not the method you wrote above). That's why you're getting the null pointer exception. By overriding the method with an implementation that doesn't use the variables, you can avoid the exception.

Comments

0

As Chris Gerken said You need to override this method while extending WritableComparator or implement RawComparator instead of WritableComparator.

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    return 0;
}

and as you said you wanted to see no sorting to be done but if you return 0 that means every time MapReduce tries to sort/compare it sees every key as the same thing so, you will receive only one key, value pair which will be the first key in the map task which gets finished first and the value with number of words in the input file. Hope you understand what I am saying. If your input is something like this

why are rockets cylindrical

your reduce output will be

why  4

since it assumes everything as the same key. I hope this helps.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.