1

The output I am expecting is the count of every word in the input file. But my output is the whole input file, as it is. I am using extends Mapper<LongWritable, Text, Text, IntWritable> for mapper class and Reducer<Text, IntWritable, Text, IntWritable> for reducer class. Here is my code

driver.java

public class driver extends Configured implements Tool{
     
     public int run(String[] args) throws Exception
       {
        Configuration conf = new Configuration();
              Job job = new Job(conf, "wordcount");
              
              job.setMapperClass(mapper.class);
              job.setReducerClass(reducer.class);
              
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);
              job.setInputFormatClass(KeyValueTextInputFormat.class);
              
              FileInputFormat.addInputPath(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
              
              job.waitForCompletion(true);
              //JobClient.runJob((JobConf) conf);
              //System.exit(job.waitForCompletion(true) ? 0 : 1);
             return 0;
       }
     
      public static void main(String[] args) throws Exception
      {
          long start = System.currentTimeMillis();
            //int res = ToolRunner.run(new Configuration(), new driver(),args);
          
           int res = ToolRunner.run(new Configuration(), new driver(),args);
            
            long stop = System.currentTimeMillis();
            System.out.println ("Time: " + (stop-start));
            System.exit(res);
      }
}

mapper.java

public class mapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
      //hadoop supported data types
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
     
      //map method that performs the tokenizer job and framing the initial key value pairs
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            String line = value.toString();
          StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens())
            {
               word.set(tokenizer.nextToken());
                 output.collect(word, one);
            }
       }
}

reducer.java
public class reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
      //reduce method accepts the Key Value pairs from mappers, do the aggregation based on keys and produce the final out put
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            int sum = 0;
          while (values.hasNext())
          {
               sum += values.next().get();
          }
          output.collect(key, new IntWritable(sum));
      }
}

1
  • Which version of hadoop do you use? What Mapper and Reducer do you import? Commented Nov 3, 2014 at 10:16

3 Answers 3

1

You are perplexed by the new & old APIs of MapReduce. I think you tried to write WordCount program in new API, but took snippets from the old API(a old blogpost perhaps). You can find the problem yourself, if you just add @override annotation to both the map & reduce methods.

See what happens to them after evolution :

You just wrote two new methods specifying older signature, so they just don't override anything, nowhere being called. The code is doing nothing since the actual methods being called have empty bodies(I don't think there is a default implementation and if there is that will be identity operations only).

Anyway, you should follow basic conventions for coding.

Sign up to request clarification or add additional context in comments.

3 Comments

+1 I agree. The default mapper and reducer in the new API are indeed IdentityMapper and IdentityReducer, respectively, that's why the input is replicated in the output.
I wonder @vefthym , how OP is earning upvotes with such a silly mistake. Hadoop should've deprecated the colliding classes, it confuses people a lot. SO is overflowing with these now a days.
Well, a lot of people seem to make the same mistake, probably by going here and there to see how hadoop works. Its inevitable that some of the tutorials will be on different versions. By the way @kishorer747, if this answer solved your question, you can mark it as accepted, otherwise, please let us know why it didn't.
0

try this,

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class WordCount  {

    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {

            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            System.out.println(line);
            while (tokenizer.hasMoreTokens()) {
                value.set(tokenizer.nextToken());
                output.collect(value, new IntWritable(1));
            }

        }
    }

    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }

            output.collect(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception,IOException  {

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path("/home/user17/test.txt"));
        FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/out2"));

        JobClient.runJob(conf);

    }
}

make jar and execute given command on commandLine

hadoop jar WordCount.jar WordCount /inputfile /outputfile

Comments

0

Please run this code if you are facing problem with your code.This code contains mapper,reducer and main functions.

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;    
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       String line = value.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);

       while (tokenizer.hasMoreTokens()) {
              word.set(tokenizer.nextToken());
              output.collect(word, one);
       }
   }
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

   public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

       int sum = 0;     
       while (values.hasNext()){
          sum += values.next().get();
       }
      output.collect(key, new IntWritable(sum)); 
    }
}

public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf(WordCount.class);
     conf.setJobName("wordcount");
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);
     conf.setMapperClass(Map.class);
     conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(Reduce.class);
     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path(args[0])); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

     JobClient.runJob(conf);
  }
}

2) After that create a jar of this code say wordcount.jar saved in your home directory (/home/user/wordcount.jar) and run the following command :

hadoop jar wordcount.jar classname /inputfile /outputfile /

This will create a file outputfile under /(root) directory of hadoop. View your result by

hadoop dfs -cat /outputfile/part-m-00000

This will successfully run your wordcount program.

1 Comment

thanks i am familiar with this code which extends MapReduceBase class. But I want my mapper class and reducer class to extend Mapper class and Reducer class insead.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.