1

I can not understand what the bug is, when I removed the job.setSortComparatorClass(LongWritable.DecreasingComparator.class);

I got the output but when I tried to use it I'm getting this exception.

Im trying to get the output in decreasing order from the reducer based on the value, hence I have used setsortcomparator class, so please help me out

package topten.mostviewed.movies;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MostViewdReducer extends Reducer<Text,IntWritable,Text,LongWritable>
{
    public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException
    {
        int sum = 0;
        for(IntWritable value:values)
        {
            sum = sum+1;
        }
        context.write(key, new LongWritable(sum));
    }
}
package topten.mostviewed.movies;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MostViewdDriver 
{

   // @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2)
        {
            System.err.println("Usage: movie <input> <out>");
            System.exit(2);
        }
    Job job = new Job(conf, "Movie ");
    job.setJarByClass(MostViewdDriver.class);
    job.setMapperClass(MostviewdMapper.class);
    job.setReducerClass(MostViewdReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
//  job.setSortComparatorClass((Class<? extends RawComparator>) LongWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

The exception i'm getting is as below:

18/10/11 11:35:05 INFO mapreduce.Job: Task Id : attempt_1539236679371_0004_r_000000_2, Status : FAILED Error: java.lang.ArrayIndexOutOfBoundsException: 7 at org.apache.hadoop.io.WritableComparator.readInt(WritableComparator.java:212) at org.apache.hadoop.io.WritableComparator.readLong(WritableComparator.java:226) at org.apache.hadoop.io.LongWritable$Comparator.compare(LongWritable.java:91) at org.apache.hadoop.io.LongWritable$DecreasingComparator.compare(LongWritable.java:106) at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:158) at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121) at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:307) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

1 Answer 1

1

Your map output keys are ints, but you tried to use comparator intended for longs. Replace LongWritable.DecreasingComparator.class with IntWritable.DecreasingComparator.class.

Sign up to request clarification or add additional context in comments.

5 Comments

Sounds like a reasonable thing to do. Probably hadoop is just looking at raw data, and when you put longs and ints into arrays (of bytes), you probably end up with arrays of different lengths, leading to such problems. Although it is more of a bug. the framework should check the array length, and throw a more meaningful message in such cases.
comparator class works only on LongWritable as well as only on the output of Reduce function
No, comparator class works on map output keys, not reduce output keys. Its job (along with grouping comparator) is to group key/value pairs by key, so that every such group could be fed into single reduce() call.
then which class should be used to sort output of reduce function
even after I changed the IntWritable stuff to LongWritable from Map's output its giving the same exception

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.