0

I was going about Hadoop framework for Mapreduce model,and actually tried out basic examples like WordCount, Max_temperature so much so as to create a mapreduce task for my project .I only want to know how to process wordcount as one output file for each input file...as in let me give you an example on that :-

FILE_1 Dog Cat Dog Bull
FILE_2 Cow Ox Tiger Dog Cat
FILE_3 Dog Cow Ox Tiger Bull

should give 3 output files, 1 for each input file as follows:-

Out_1 Dog 2,Cat 1,Bull 1
Out_2 Cow 1,Ox 1,Tiger 1,Dog 1,Cat 1
Out_3 Dog 1,Cow 1,Ox 1,Tiger 1,Bull 1

I went through the answers posted here Hadoop MapReduce - one output file for each input but couldn't grasp it properly.

Help please! Thanks

3
  • What part you couldn't grasp properly? Commented Feb 26, 2014 at 15:29
  • Joao , as in what is the corresponding code in the reducer function,to per se access the values (from mapper) seperately for each input file?in short,kindly mention the reducer code for that link too , it'd be helpful Commented Feb 26, 2014 at 15:39
  • possible duplicate of Hadoop MapReduce - one output file for each input Commented Feb 26, 2014 at 17:27

2 Answers 2

0

Each Reducer outputs one output file. The number of output files is dependent on number of Reducers.

(A) Assuming you want to process all three input files in a single MapReduce Job.

At the very minimum - you must set number of Reducers equal to the Number of Output Files you want.

Since you are trying to do word-counts Per File. And not across Files. You will have to ensure that all the file contents (of one file) are processed by a Single Reducer. Using a Custom Partitioner is one way to do this.

(B) Another way is to simply run your MapReduce Job Three Times. Once for Each Input File. And have Reducer count as 1.

Sign up to request clarification or add additional context in comments.

Comments

0

Even I am a newbie in hadoop and found this question very interesting. And this is how I resolved this.

public class Multiwordcnt {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException  {

            Configuration conf = new Configuration();
            Job myJob = new Job(conf, "Multiwordcnt");
            String[] userargs = new GenericOptionsParser(conf, args).getRemainingArgs();

            myJob.setJarByClass(Multiwordcnt.class);
            myJob.setMapperClass(MyMapper.class);
            myJob.setReducerClass(MyReducer.class);     
            myJob.setMapOutputKeyClass(Text.class);
            myJob.setMapOutputValueClass(IntWritable.class);

            myJob.setOutputKeyClass(Text.class);
            myJob.setOutputValueClass(IntWritable.class);

            myJob.setInputFormatClass(TextInputFormat.class);
            myJob.setOutputFormatClass(TextOutputFormat.class);

            FileInputFormat.addInputPath(myJob, new Path(userargs[0]));
            FileOutputFormat.setOutputPath(myJob, new Path(userargs[1]));

            System.exit(myJob.waitForCompletion(true) ? 0 : 1 );
    }

    public static  class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        Text emitkey = new Text();
        IntWritable emitvalue = new IntWritable(1);

        public void map(LongWritable key , Text value, Context context) throws IOException, InterruptedException {

            String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();                     
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while  (tokenizer.hasMoreTokens()){

                String filepathword = filePathString + "*" + tokenizer.nextToken();
                emitkey.set(filepathword);
                context.write(emitkey, emitvalue);
            }           
        }
    }

    public static  class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        Text emitkey = new Text();
        IntWritable emitvalue = new IntWritable();
        private MultipleOutputs<Text,IntWritable> multipleoutputs;

        public void setup(Context context) throws IOException, InterruptedException {
            multipleoutputs = new MultipleOutputs<Text,IntWritable>(context);
        }           

        public void reduce(Text key , Iterable <IntWritable> values, Context context)   throws IOException, InterruptedException {
            int sum = 0;

            for (IntWritable value : values){
                sum = sum + value.get();
            }
            String pathandword = key.toString();
            String[] splitted = pathandword.split("\\*");
            String path = splitted[0];
            String word = splitted[1];              
            emitkey.set(word);
            emitvalue.set(sum);
            System.out.println("word:" + word + "\t" + "sum:" + sum + "\t" + "path:  " + path);
            multipleoutputs.write(emitkey,emitvalue , path);
        }

        public void cleanup(Context context) throws IOException, InterruptedException {
            multipleoutputs.close();
        }
    }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.