0

I have two mapper classes and two reducer classes. I want the flow to be like this. mapperOne--> ReducerOne-->MapperTwo-->ReducerTwo.

Here is my driver class code.

public class StockDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        System.out.println(" Driver invoked------");
        Configuration config = new Configuration();
        config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
        config.set("mapred.textoutputformat.separator", " --> ");

        String inputPath="In\\NYSE_daily_prices_Q_less.csv";

        String outpath = "C:\\Users\\Desktop\\Hadoop\\run1";
        String outpath2 = "C:\\Users\\Desktop\\Hadoop\\run2";

        Job job1 = new Job(config,"Stock Analysis: Creating key values");
        job1.setInputFormatClass(TextInputFormat.class);
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(StockDetailsTuple.class);

        job1.setMapperClass(StockMapperOne.class);
        job1.setReducerClass(StockReducerOne.class);

        FileInputFormat.setInputPaths(job1, new Path(inputPath));
        SequenceFileOutputFormat.setOutputPath(job1, new Path(outpath));
        //FileOutputFormat.setOutputPath(job1, new Path(outpath));

        //THE SECOND MAP_REDUCE TO DO CALCULATIONS

        Job job2 = new Job(config,"Stock Analysis: Calculating Covariance");
        job2.setInputFormatClass(SequenceFileInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        job2.setMapperClass(StockMapperTwo.class);
        job2.setReducerClass(StockReducerTwo.class);

        SequenceFileInputFormat.setInputPaths(job2, new Path(outpath));
        FileOutputFormat.setOutputPath(job2, new Path(outpath2));


        System.out.println(job1.waitForCompletion(true));
        System.out.println(job2.waitForCompletion(true));
    }

}

MY MapperOne CLASS

public class StockMapperOne extends Mapper<LongWritable, Text, Text, StockDetailsTuple> {

    private StockDetailsTuple stock= new StockDetailsTuple();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

        String valueString = value.toString();
        String[] tokenArray = valueString.split(",");

        String dateOfStock= tokenArray[2];
        int month = Integer.parseInt(dateOfStock.substring(5, 7));
        int year = Integer.parseInt(dateOfStock.substring(0,4));

        stock.setStockDate(dateOfStock); 
        stock.setStockName(tokenArray[1]);
        stock.setStockPrice(Float.parseFloat(tokenArray[4]));
        stock.setMonthNum(month);
        stock.setYear(year);
        System.out.println(" Date of stock: "+dateOfStock + "  stock:   "+stock);
        context.write(new Text(dateOfStock.trim()), stock);

    }
}

REDUCER ONE CLASS (I am setting the key,value pair as TEXT,TEXT)

public class StockReducerOne extends Reducer<Text, StockDetailsTuple, Text, Text> {

    public void reduce(Text key, Iterable<StockDetailsTuple> values, Context context) throws IOException, InterruptedException{
            for(StockDetailsTuple val: values){
                System.out.println("VAL : "+ val);
                priceMap.put(val.getStockName(),val.getStockPrice());
                stockGroups = stockGroups.append(val.getStockName()).append(":"); 
                month=val.getMonthNum();
                count++;

            }

            if(count>1){
                stockGroupAfterPermutations =doPermutation(stockGroups.toString());
                formNewKey(context,this.stockGroupAfterPermutations, priceMap, month);

            }
    }


    private void formNewKey(Context context,List<String> stockGroup, Map<String, Float> priceMap2,int month2) 
            throws IOException, InterruptedException {

        System.out.println(" FORMING NEW KEY----------");
        String tempKey=null,tempVal = null ;
        for(String stkgrp:stockGroup){
            String[] splitTokens = stkgrp.split(",");
            //The below line gives the key as 3 QRR,QTM 
            // value as 12.22 13.33
            Arrays.sort(splitTokens);
            //System.out.println(" SORTED ARRAY:   -->  "+ splitTokens);
            tempKey=String.valueOf(month2)+","+splitTokens[0]+","+splitTokens[1];
            tempVal=splitTokens[0]+","+String.valueOf(priceMap2.get(splitTokens[0]))+","+splitTokens[1]+","+String.valueOf(priceMap2.get(splitTokens[1]));
            //Finally our key value pair looks like
            //7,QTM,QXM --> QTM,3.22,QXM,9.61

        }
        System.out.println(" NEW KEY: "+tempKey +"  NEW VAL: "+ tempVal);
        context.write(new Text(tempKey), new Text(tempVal));
    }


}


MY WRITABLE StockDetailsTuple Class
-------------------------------------

public class StockDetailsTuple implements Writable{

    private String stockName;
    private int monthNum;
    private int year;
    private float stockPrice;
    private String stockDate;

    @Override
    public void readFields(DataInput in) throws IOException {
        this.monthNum=in.readInt();
        this.year=in.readInt();
        this.stockPrice=in.readFloat();
        this.stockName=in.readUTF();
        this.stockDate=in.readUTF();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.monthNum);
        out.writeInt(this.year);
        out.writeFloat(this.stockPrice);
        out.writeUTF(this.stockName);
        out.writeUTF(this.stockDate);

    }

    public String getStockName() {
        return stockName;
    }

    public void setStockName(String stockName) {
        this.stockName = stockName;
    }

    public int getMonthNum() {
        return monthNum;
    }

    public void setMonthNum(int monthNum) {
        this.monthNum = monthNum;
    }

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public float getStockPrice() {
        return stockPrice;
    }

    public void setStockPrice(float stockPrice) {
        this.stockPrice = stockPrice;
    }

    public String getStockDate() {
        return stockDate;
    }

    public void setStockDate(String stockDate) {
        this.stockDate = stockDate;
    }

    @Override
    public String toString() {
        return "StockDetailsTuple [stockName=" + stockName + ", monthNum="
                + monthNum + ", year=" + year + ", stockPrice=" + stockPrice
                + ", stockDate=" + stockDate + "]";
    }



}

When I run this code, I am getting an exception at ReducerOne.13/07/15 23:11:49 WARN mapred.LocalJobRunner: job_local_0001 java.io.IOException: wrong value class: org.apache.hadoop.io.Text is not class com.assignment.StockDetailsTuple at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1050) at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:74) at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:588) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)

If I change the value in ReducerOne to StockDetailsTuple, then this code runs. What am I missing here? I want the key, value in reducer to be Text, Text. Please help.

1 Answer 1

1

setOutputKeyClass and setOutputValueClass define the types of objects emitted by the reducer, but per your reducer (one) source code those should be Text, Text instead of Text, StockDetailsTuple.

Use setMapOutputKeyClass and setMapOutputValueClass to specify the mapper's output types if they're different to the reducer ones.

PS: your StockReducerOne class modifies instance variables in its reduce method (priceMap, count, ...), which you probably don't want to do. At least you should reset those to initial values at each reducer invokation, otherwise you might get unexpected results.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your comments. yes I missed to set the MapoutputKey and MapOutputputValue. I am able to resolve this error. One more thing I am facing is that my second mapper is not getting invoked. Why its happening like that?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.