I am running a simple mapreduce job written in python and I noticed that when I test the script locally, i obtain a different out then when I run the job on hadoop. My input is something of a kind:
key1 val1
key1 val2
key1 val3
key1 val4
key2 val1
key2 val3
key2 val5
key3 val5
key4 val4
My mapper creates a dictionary of values with their corresponding list (string) of keys (e.g. val1 key1,key2 ; val2 key1 ; val3 key1,key2 ....). Then for each value in the dictionary I print all the possible key pairs. So the output of my mapper looks like:
key1_key2 1 # obtained from val1
key1_key2 1 # obtained from val3
key1_key4 1 # obtained from val4
key2_key3 1 # obtained from val5
The reducer counts the number of identical key pairs and prints the count. My mapper code is:
val_dic = dict()
def print_dic(dic):
for val, key_array in dic.iteritems():
key_pair= ""
i=0
j=1
for i in range(len(key_array)-1):
for j in range(i+1,len(key_array)):
key_pair = key_array[i]+"_"+key_array[j]
print "{0}\t{1}".format(key_pair,"1")
for line in sys.stdin:
key, val = line.strip().split("\t")
if (not val in val_dic):
val_dic[val]=[]
val_dic[val].append(key)
print_dic(val_dic)
The reducer is counting all the identical values:
current_pair = None
current_count = 0
for line in sys.stdin:
key_pair, count = line.strip().split("\t")
count = int(count)
if current_pair == key_pair:
current_count += count
else:
print "{0}\t{1}".format(current_pair,str(current_count))
current_pair = key_pair
current_count = count
print "{0}\t{1}".format(current_pair,str(current_count))
However when I run it on hadoop on a larger dataset it seems that half the results are missing. When I test it on the local machine using cat input | mapper.py | sort |reducer.py > out-local If the input is reasonalbe small,it works fine, but on bigger data sets (e.g. 1M entries), the local output file has almost twice as many entries than the one obtained from running the mapreduce job on hadoop. Is there an error in the code? or am I missing something? Any help is highly appreciated.