0

I am a beginner of MapReduce programming and have coded the following Java program for running in a Hadoop cluster comprising 1 NameNode and 3 DatanNodes :

package trial;

import java.io.IOException;
import java.util.*;
import java.lang.Iterable;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;


public class Trial 
{

public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

    public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
    {
       String[] rows = value.toString().split("\r?\n");          
       for(int i=0;i<rows.length;i++)
       {
           String[] cols = rows[i].toString().split(",");

           String v=cols[0];
           for(int j=1;j<cols.length;j++)
           {
               String k =j+","+cols[j];
               output.collect(new Text(k),new Text(v));
           }
       }


   }
}


public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException 
        {
            int count =0;                
            String[] attr = key.toString().split(",");      
            List<String> list = new ArrayList<String>();

           while(values.hasNext())               
            {
                list.add((values.next()).toString());
                count++;

            }

           String v=Integer.toString(count);
           for(String s:list)
           { 
               output.collect(new Text(s),new Text(v));
           }

        }   

}




public static void main(String[] args) throws IOException
{
    JobConf conf1 = new JobConf(Trial.class);
    conf1.setJobName("Trial");

    conf1.setOutputKeyClass(Text.class);
    conf1.setOutputValueClass(Text.class);

    conf1.setMapperClass(MapA.class);
    //conf.setCombinerClass(Combine.class);
    conf1.setReducerClass(ReduceA.class);

    conf1.setInputFormat(TextInputFormat.class);
    conf1.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf1, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf1, new Path(args[1]));

    JobClient.runJob(conf1);

    JobConf conf2 = new JobConf(Final.class);
    conf2.setJobName("Final");

    conf2.setOutputKeyClass(Text.class);
    conf2.setOutputValueClass(Text.class);

    conf2.setMapperClass(Final.MapB.class);
    //conf.setCombinerClass(Combine.class);
    conf2.setReducerClass(Final.ReduceB.class);

    conf2.setInputFormat(TextInputFormat.class);
    conf2.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf2, new Path(args[1]));
    FileOutputFormat.setOutputPath(conf2, new Path(args[2]));

    JobClient.runJob(conf2);


  }


  }  

class Final
{

public static class MapB extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

    public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
    {
       String[] r = value.toString().split("\r?\n");
       String[] p1= new String[5];

       for(int i=0;i<r.length;i++)
       {
           p1 = r[i].split("\t");               
           output.collect(new Text(p1[0]),new Text(p1[1]));
       }

   }
}

 public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

        @Override
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException 
        {
           int sum=0;
           while(values.hasNext())
           {
               String s = (values.next()).toString();
               int c=Integer.parseInt(s);
               sum+=c;
           }
           float avf =(float)sum/3;
           String count=Float.toString(avf);
           output.collect(key,new Text(count));
        }   

}

}

The program is run on a dataset like this:

ID1,1,2,3 
ID1,1,3,2
ID3,2,2,3

Each row has an ID followed by 3 comma-separated attributes. My problem is to find the frequency of the value of each attribute(along the column not across the row if the dataset is seen as a 2-D array) of each ID and then sum up the frequencies of each attribute for an ID and find the average.Thus for the above the dataset:

ID1 : 2+2+2/3=2
ID2 : 2+1+1/3=1.33
ID3 : 1+2+2/3=1.67

The above code is working well with small datasets like 200-500MB. But for datasets above 1GB I am getting an error like this:

 map 100% reduce 50%
       14/04/12 12:33:06 INFO mapred.JobClient: Task Id :  attempt_201404121146_0002_r_000001_0, Status : FAILED
      Error: Java heap space
      attempt_201404121146_0002_r_000001_0: Exception in thread  "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
      attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:397)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
      attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
     attempt_201404121146_0002_r_000001_0:     at java.lang.Thread.run(Thread.java:662)
     attempt_201404121146_0002_r_000001_0: Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
     attempt_201404121146_0002_r_000001_0:     at java.util.AbstractList.iterator(AbstractList.java:273)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:363)
     attempt_201404121146_0002_r_000001_0:     at org.apache.hadoop.mapred.Child$3.run(Child.java:158)
     14/04/12 12:33:10 INFO mapred.JobClient:  map 100% reduce 33%
     14/04/12 12:33:12 INFO mapred.JobClient: Task Id :    attempt_201404121146_0002_r_000003_0, Status : FAILED
     Error: Java heap space
      attempt_201404121146_0002_r_000003_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.mapred.Task).
     attempt_201404121146_0002_r_000003_0: log4j:WARN Please initialize the log4j system properly.
      attempt_201404121146_0002_r_000003_0: log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
     14/04/12 12:33:15 INFO mapred.JobClient:  map 100% reduce 16%
     14/04/12 12:33:16 INFO mapred.JobClient:  map 100% reduce 18%
     14/04/12 12:33:16 INFO mapred.JobClient: Task Id : attempt_201404121146_0002_r_000000_0, Status : FAILED
     Error: Java heap space
      attempt_201404121146_0002_r_000000_0: Exception in thread "LeaseRenewer:hdfs@NameNode:8020" java.lang.OutOfMemoryError: Java heap space
     attempt_201404121146_0002_r_000000_0:     at java.lang.StringCoding.set(StringCoding.java:53)
     attempt_201404121146_0002_r_000000_0:     at java.lang.StringCoding.decode(StringCoding.java:171)
     attempt_201404121146_0002_r_000000_0:     at java.lang.String.<init>(String.java:443)
     attempt_201404121146_0002_r_000000_0:     at java.util.jar.Attributes.read(Attributes.java:401)
      attempt_201404121146_0002_r_000000_0:     at java.util.jar.Manifest.read(Manifest.java:182)
      attempt_201404121146_0002_r_000000_0:     at java.util.jar.Manifest.<init>(Manifest.java:52)
       attempt_201404121146_0002_r_000000_0:     at java.util.jar.JarFile.getManifestFromReference(JarFile.java:167)
       attempt_201404121146_0002_r_000000_0:     at java.util.jar.JarFile.getManifest(JarFile.java:148)
       attempt_201404121146_0002_r_000000_0:     at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:696)
       attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.defineClass(URLClassLoader.java:228)
        attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
        attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
       attempt_201404121146_0002_r_000000_0:     at      java.security.AccessController.doPrivileged(Native Method)
       attempt_201404121146_0002_r_000000_0:     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
     attempt_201404121146_0002_r_000000_0:     at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
     attempt_201404121146_0002_r_000000_0:     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
   attempt_201404121146_0002_r_000000_0:     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
   attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:400)
   attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:436)
  attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:70)
  attempt_201404121146_0002_r_000000_0:     at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:297)
  attempt_201404121146_0002_r_000000_0:     at java.lang.Thread.run(Thread.java:662)
 14/04/12 12:33:21 INFO mapred.JobClient:  map 100% reduce 20%

I think my program is consuming too much memory and need to be optimized. I even tried to solve this by increasing my java heap space to 1024MB but still I am getting the same error. The dataset I had used was 1.4GB which had 5cr rows with 9 attributes exclusing the row ID. Since my problem is of Big data , testing the code with small data is not a solution. Plz can you suggest me how do i optimise my code so that the memory issue is resolved. Thanks in advance.

9
  • 1
    In your first job you are keeping all the values corresponding to a specific key in a list.As you have 5cr rows and each row have 9 attributes the size of all the values corresponding to a specific key will be too large for a normal List in java to keep in heap memory.That is the reason for java.lang.OutOfMemoryError: Java heap space exception.You have to avoid keeping all the values correponding to a key in an object in java heap. Commented Apr 13, 2014 at 14:10
  • I was getting this error after the reduce of my first MapReduce reached 66%. So that must be the problem. Commented Apr 16, 2014 at 12:05
  • Yes can you try change that list Commented Apr 16, 2014 at 13:19
  • @donut:According to my task of reduce function I need to traverse the iterator of values twice- once, to count the values and next, to output the iterator Text values as key and count as value. How do I traverse the iterator twice? Is there any alternative way that fulfils similar logic ? Commented Apr 17, 2014 at 5:39
  • @MonamiSen Could you give a clearer example case of what you're trying to achieve. There might be another way to go about it. Commented Apr 17, 2014 at 12:31

1 Answer 1

1

Since the option of traversing the iterator twice is not possible and your heap cannot handle the large amount of values stored in a list, I suggest you add an intermediary MapReduce step, giving a total of three MapReduce steps for your job.

My proposition is as follows :

  • Step 1
    Mapper 1 outputs attributeID + "," + value => UserID
    Reducer 1 computes the total count for each key (attributeID + "," + value). First, it outputs the attributeID + "," + value => UserID as received from Mapper 1. Secondly, it outputs "." + attributeID + "," + value => total_count. The dot is added as prefix to ensure that all total_counts arrive first to the next Reducer. This is guaranteed thanks to the sort phase.

  • Step 2
    Mapper 2 does nothing other than output every input it receives.
    Reducer 2 is guaranteed to receive the total_counts first. So as long as it's a row that corresponds to a total_count, it stores it in a HashMap (attributeID + "," + value => total_count). So as soon as it starts receiving the other rows, all it has to do is retrieve the corresponding total_count from the HashMap and output UserID => total_count.
    Note that only one Reducer should be used in this phase, so you have to set mapreduce.job.reduces to 1. You can reset it to your former value after this step.

  • Step 3
    Same as the second MapReduce step in your initial solution. Computes the average and outputs UserID => average.

This solution is quite optimistic, as it assumes that your heap can handle your HashMap. Give it a try and see what happens.

Here is a sample code :

public class Trial {

public static class MapA extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
{

public void map(LongWritable key, Text value, OutputCollector<Text,Text> output, Reporter reporter) throws IOException
{
        String[] rows = value.toString().split("\r?\n");
        for (int i = 0; i < rows.length; i++) {
            String[] cols = rows[i].toString().split(",");

            String v = cols[0];
            for (int j = 1; j < cols.length; j++) {
                String k = j + "," + cols[j];
                output.collect(new Text(k), new Text(v));
            }
        }
}
}


public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

        int count = 0;

        while (values.hasNext()) {
            output.collect(key, values.next());
            count++;
        }
        output.collect(new Text("." + key),
                new Text(count));
    }  

}


public static class MapB extends MapReduceBase implements Mapper<Text, Text, Text, Text> 
{

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
    output.collect(key, value);
}
}


public static class ReduceB extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

private Map<String, Integer> total_count = new HashMap<String, Integer>();
private Set<String> attributes = new HashSet<String>(); // count the distinct number of attributes

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {

        String rKey = key.toString();
        if(rKey.startsWith(".")){
            while (values.hasNext()) {
                total_count.put(rKey.substring(1), Integer.valueOf(values.next().toString()));
                attributes.add(rKey.substring(1).split(",")[0]);
                return;
            }
        }
        while (values.hasNext()) {
            Text value = values.next();
            output.collect(value, new Text(Integer.toString(total_count.get(rKey))));
            output.collect(value, new Text("." + attributes.size())); // send the total number of attributes
        }
    }  
}


public static class MapC extends MapReduceBase implements Mapper<Text, Text, Text, Text> 
{

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
        output.collect(key, value);
    }
}

public static class ReduceC extends MapReduceBase implements Reducer<Text, Text, Text, DoubleWritable>
{

    @Override
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, DoubleWritable>output, Reporter reporter) throws IOException 
    {
       long sum = 0;
       int nbAttributes = 0;
       while(values.hasNext()){
           String value = values.next();
           if(value.startsWith(".")){ // check if line corresponds to the total number of attributes
               nbAttributes = Integer.parseInt(value.substring(1)); 
           } else{
               sum += Integer.parseInt(value);   
           }
       }
       output.collect(key, new DoubleWritable(sum / nbAttributes));
    }   
}

} 
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you so much for giving me an alternative logic. But I am not understanding some parts still. Does dot have a special meaning in mapreduce programming? How is it going to ensure that all total_counts arrive first to the next reducer ? and even if it is ensuring so how it is going to help? You are telling me to output (.attributeID,value) as key but then where am i going to get the attribute ID from? I already have finished traversing the iterator once. I am not getting any idea also how am i going to implement the second step.
It will be very much of help if you do a sample execution of your logic like i have explained mine in the previous comments. Also a few lines of rough code would be helpful for me . Thanx in advance.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.