3

I'm trying to implement a simple Hadoop map reduce example using Cloudera 5.5.0 The map & reduce steps should be implemented using Python 2.6.6

Problem:

  • If the scripts are being executed on the unix command line they're working perfectly fine and producing the expected output.

cat join2*.txt | ./join3_mapper.py | sort | ./join3_reducer.py

  • But executing the scripts as a hadoop task terribly fails:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input /user/cloudera/inputTV/join2_gen*.txt -output /user/cloudera/output_tv -mapper /home/cloudera/join3_mapper.py -reducer /home/cloudera/join3_reducer.py -numReduceTasks 1

16/01/06 12:32:32 INFO mapreduce.Job: Task Id : attempt_1452069211060_0026_r_000000_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

  • The mapper works, if the hadoop command is executed with -numReduceTasks 0, the hadoop job is executing only map step, ends successfully and the output directory contains the result files from map step.

  • I guess there must be something wrong with the reduce step then ?

  • The stderr logs in Hue shows nothing relevant:

Log Upload Time: Wed Jan 06 12:33:10 -0800 2016 Log Length: 222 log4j:WARN No appenders could be found for logger (org.apache.hadoop.ipc.Server). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Code of the scripts: 1st file: join3_mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
   line       = line.strip()   #strip out carriage return
   tuple2  = line.split(",")   #split line, into key and value, returns a list

   if len(tuple2) == 2:
      key = tuple2[0]
      value = tuple2[1]
      if value == 'ABC':
         print('%s\t%s' % (key, value) )
      elif value.isdigit():
         print('%s\t%s' % (key, value) ) 

The 2nd file: join3_reducer.py

#!/usr/bin/env python
import sys

last_key      = None              #initialize these variables
running_total = 0
abcFound =False;
this_key      = None

# -----------------------------------
# Loop the file
#  --------------------------------
for input_line in sys.stdin:
    input_line = input_line.strip()

    # --------------------------------
    # Get Next Key value pair, splitting at tab
    # --------------------------------
    tuple2 = input_line.split("\t") 

    this_key = tuple2[0]    
    value = tuple2[1]
    if value.isdigit():
        value = int(value) 

    # ---------------------------------
    # Key Check part
    #    if this current key is same 
    #          as the last one Consolidate
    #    otherwise  Emit
    # ---------------------------------
    if last_key == this_key:     
        if value == 'ABC':  # filter for only ABC in TV shows
            abcFound=True;
        else:
            if isinstance(value, (int,long) ): 
                running_total += value   

    else:
        if last_key:         #if this key is different from last key, and the previous 
                             #   (ie last) key is not empy,
                             #   then output 
                             #   the previous <key running-count>
           if abcFound:
              print('%s\t%s' % (last_key, running_total) )
              abcFound=False;

        running_total = value    #reset values
        last_key = this_key

if last_key == this_key:
    print('%s\t%s' % (last_key, running_total) )

I have tried various different ways of declaring the input file to the hadoop command, no difference, no success.

What am I doing wrong ? Hints, ideas are very appreciated thank you

3
  • Don't you need toolrunner to be able to run a jar file from the command line? Commented Jan 6, 2016 at 21:02
  • Also, aren't jar files for Java programs? Commented Jan 6, 2016 at 21:05
  • I'm not executing a jar file myself, I'm executing hadoop command and tell hadoop to execute the declared jar file. The rest following the library path are parameters related to the hadoop-streaming.jar and are related to the MapReduce action which is executed. yes, jar files are java programs Commented Jan 6, 2016 at 21:11

2 Answers 2

1

What a lucky punch, fighting with that one for days and know I got it working:

Since the local (unix) execution of

cat join2_gen*.txt | ./join2_mapper.py | sort | ./join2_reducer.py

worked fine I had the idea to use 1 merged input file, instead of the provided 6 input files, so:

cat join2_gen*.txt >> mergedinputFile.txt

hdfs dfs -put mergedInputFile.txt /user/cloudera/input

then executing the very same hadoop command again, directing the input to the mergedInputFile in the input folder --> perfect result, no problem, no exception job done.

For me it raises the question:

  • Why is it working with one merged input file but now with providing the smaller 6 files ?? No idea (yet)
Sign up to request clarification or add additional context in comments.

1 Comment

I'm having the same problem! It's working perfect in serial mode.
1

Try putting all the input text files in one directory and then passing the directory as the input. This way you won't have to merge all your input files

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.