Hadoop Map-reduce

Map step: mapper.py
 
It will read data from STDIN, split it into words and output a list of lines mapping words to their (intermediate) counts to STDOUT. The Map script will not compute an (intermediate) sum of a word’s occurrences though. Instead, it will output <word> 1 tuples immediately – even though a specific word might occur multiple times in the input. In our case we let the subsequent Reduce step do the final sum count. Of course, you can change this behavior in your own scripts as you please, but we will keep it like that in this tutorial because of didactic reasons. :-)
Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you will run into problems.
 
 
[root@hostname hadoop-1.2.1]# cat mapper.py
#!/usr/bin/env python
 
import sys
 
# input comes from STDIN (standard input)
for line in sys.stdin:
 # remove leading and trailing whitespace
 line = line.strip()
 # split the line into words
 words = line.split()
 # increase counters
 for word in words:
 # write the results to STDOUT (standard output);
 # what we output here will be the input for the
 # Reduce step, i.e. the input for reducer.py
 #
 # tab-delimited; the trivial word count is 1
 print '%s\t%s' % (word, 1)
[root@hostname hadoop-1.2.1]#
[root@hostname hadoop-1.2.1]#
 
 
Reduce step: reducer.py

It will read the results of mapper.py from STDIN (so the output format of mapper.py and the expected input format of reducer.py must match) and sum the occurrences of each word to a final count, and then output its results to STDOUT.
Make sure the file has execution permission (chmod +x reducer.py should do the trick) or you will run into problems.
 
 
[root@hostname hadoop-1.2.1]# cat reducer.py
#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
current_word = None
current_count = 0
word = None
 
# input comes from STDIN
for line in sys.stdin:
 # remove leading and trailing whitespace
 line = line.strip()
 
 # parse the input we got from mapper.py
 word, count = line.split('\t', 1)
 
 # convert count (currently a string) to int
 try:
 count = int(count)
 except ValueError:
 # count was not a number, so silently
 # ignore/discard this line
 continue
 
 # this IF-switch only works because Hadoop sorts map output
 # by key (here: word) before it is passed to the reducer
 if current_word == word:
 current_count += count
 else:
 if current_word:
 # write result to STDOUT
 print '%s\t%s' % (current_word, current_count)
 current_count = count
 current_word = word
 
# do not forget to output the last word if needed!
if current_word == word:
 print '%s\t%s' % (current_word, current_count)
[root@hostname hadoop-1.2.1]#
 
Test your code
I recommend to test your mapper.py and reducer.py scripts locally before using them in a MapReduce job. Otherwise your jobs might successfully complete but there will be no job result data at all or not the results you would have expected.
Here are some ideas on how to test the functionality of the Map and Reduce scripts.
 
 
[root@hostname hadoop-1.2.1]# echo "foo foo quux labs foo bar quux" | /root/hadoop-1.2.1/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
[root@hostname hadoop-1.2.1]#
 
Run the MapReduce job
Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above, we leverage the Hadoop Streaming API for helping us passing data between our Map and Reduce code via STDIN and STDOUT.
 
[root@hostname hadoop-1.2.1]# hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -file mapper.py -mapper mapper.py -file reducer.py -reducer reducer.py -input /home/hadoop/MapReduce -output /home/hadoop/MapReduce/op
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-root/hadoop-unjar4598109263115920539/] [] /tmp/streamjob2989678758078645316.jar tmpDir=null
14/07/09 10:45:35 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/07/09 10:45:35 WARN snappy.LoadSnappy: Snappy native library not loaded
14/07/09 10:45:35 INFO mapred.FileInputFormat: Total input paths to process : 1
14/07/09 10:45:35 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-root/mapred/local]
14/07/09 10:45:35 INFO streaming.StreamJob: Running job: job_201407090310_0002
14/07/09 10:45:35 INFO streaming.StreamJob: To kill this job, run:
14/07/09 10:45:35 INFO streaming.StreamJob: /root/hadoop-1.2.1/libexec/../bin/hadoop job -Dmapred.job.tracker=master:9001 -kill job_201407090310_0002
14/07/09 10:45:35 INFO streaming.StreamJob: Tracking URL: http://hostname.nam.nsroot.net:50030/jobdetails.jsp?jobid=job_201407090310_0002
14/07/09 10:45:36 INFO streaming.StreamJob: map 0% reduce 0%
14/07/09 10:45:41 INFO streaming.StreamJob: map 50% reduce 0%
14/07/09 10:45:42 INFO streaming.StreamJob: map 100% reduce 0%
14/07/09 10:45:51 INFO streaming.StreamJob: map 100% reduce 33%
14/07/09 10:45:52 INFO streaming.StreamJob: map 100% reduce 100%
14/07/09 10:45:55 INFO streaming.StreamJob: Job complete: job_201407090310_0002
14/07/09 10:45:55 INFO streaming.StreamJob: Output: /home/hadoop/MapReduce/op
[root@hostname hadoop-1.2.1]#
[root@hostname hadoop-1.2.1]#
[root@hostname hadoop-1.2.1]#
[root@hostname hadoop-1.2.1]#
[root@hostname hadoop-1.2.1]# hadoop dfs -ls /home/hadoop/MapReduce/op
Found 3 items
-rw-r--r-- 2 root supergroup 0 2014-07-09 10:45 /home/hadoop/MapReduce/op/_SUCCESS
drwxr-xr-x - root supergroup 0 2014-07-09 10:45 /home/hadoop/MapReduce/op/_logs
-rw-r--r-- 2 root supergroup 55 2014-07-09 10:45 /home/hadoop/MapReduce/op/part-00000
[root@hostname hadoop-1.2.1]#
 
[root@hostname hadoop-1.2.1]# hadoop dfs -cat /home/hadoop/MapReduce/op/part-00000
Cluster 3
Hadoop 3
MapReduce 3
Multi-node 3
job 3
on 3
[root@hostname hadoop-1.2.1]#

  • Ask Question