Writing Hadoop MapReduce Program in Python

Map step: mapper.py

It will read data from STDIN, split it into words and output a list of lines mapping words to their (intermediate) counts to STDOUT. The Map script will not compute an (intermediate) sum of a word’s occurrences though. Instead, it will output <word> 1 tuples immediately – even though a specific word might occur multiple times in the input. In our case we let the subsequent Reduce step do the final sum count. Of course, you can change this behavior in your own scripts as you please, but we will keep it like that in this tutorial because of didactic reasons. 🙂
Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you will run into problems.

[root@12d4-dl585-04 hadoop-1.2.1]# cat mapper.py
#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print ‘%s\t%s’ % (word, 1)
[root@12d4-dl585-04 hadoop-1.2.1]#
[root@12d4-dl585-04 hadoop-1.2.1]#

Reduce step: reducer.py
It will read the results of mapper.py from STDIN (so the output format of mapper.py and the expected input format of reducer.py must match) and sum the occurrences of each word to a final count, and then output its results to STDOUT.
Make sure the file has execution permission (chmod +x reducer.py should do the trick) or you will run into problems.

[root@12d4-dl585-04 hadoop-1.2.1]# cat reducer.py
#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# parse the input we got from mapper.py
word, count = line.split(‘\t’, 1)

# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue

# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print ‘%s\t%s’ % (current_word, current_count)
current_count = count
current_word = word

# do not forget to output the last word if needed!
if current_word == word:
print ‘%s\t%s’ % (current_word, current_count)
[root@12d4-dl585-04 hadoop-1.2.1]#

Test your code
I recommend to test your mapper.py and reducer.py scripts locally before using them in a MapReduce job. Otherwise your jobs might successfully complete but there will be no job result data at all or not the results you would have expected.
Here are some ideas on how to test the functionality of the Map and Reduce scripts.

[root@12d4-dl585-04 hadoop-1.2.1]# echo “foo foo quux labs foo bar quux” | /root/hadoop-1.2.1/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
[root@12d4-dl585-04 hadoop-1.2.1]#

Run the MapReduce job
Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above, we leverage the Hadoop Streaming API for helping us passing data between our Map and Reduce code via STDIN and STDOUT.

[root@12d4-dl585-04 hadoop-1.2.1]# hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -file mapper.py -mapper mapper.py -file reducer.py -reducer reducer.py -input /home/hadoop/MapReduce -output /home/hadoop/MapReduce/op
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-root/hadoop-unjar4598109263115920539/] [] /tmp/streamjob2989678758078645316.jar tmpDir=null
14/07/09 10:45:35 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/07/09 10:45:35 WARN snappy.LoadSnappy: Snappy native library not loaded
14/07/09 10:45:35 INFO mapred.FileInputFormat: Total input paths to process : 1
14/07/09 10:45:35 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-root/mapred/local]
14/07/09 10:45:35 INFO streaming.StreamJob: Running job: job_201407090310_0002
14/07/09 10:45:35 INFO streaming.StreamJob: To kill this job, run:
14/07/09 10:45:35 INFO streaming.StreamJob: /root/hadoop-1.2.1/libexec/../bin/hadoop job -Dmapred.job.tracker=master:9001 -kill job_201407090310_0002
14/07/09 10:45:35 INFO streaming.StreamJob: Tracking URL: http://12d4-dl585-04.nam.nsroot.net:50030/jobdetails.jsp?jobid=job_201407090310_0002
14/07/09 10:45:36 INFO streaming.StreamJob: map 0% reduce 0%
14/07/09 10:45:41 INFO streaming.StreamJob: map 50% reduce 0%
14/07/09 10:45:42 INFO streaming.StreamJob: map 100% reduce 0%
14/07/09 10:45:51 INFO streaming.StreamJob: map 100% reduce 33%
14/07/09 10:45:52 INFO streaming.StreamJob: map 100% reduce 100%
14/07/09 10:45:55 INFO streaming.StreamJob: Job complete: job_201407090310_0002
14/07/09 10:45:55 INFO streaming.StreamJob: Output: /home/hadoop/MapReduce/op
[root@12d4-dl585-04 hadoop-1.2.1]#
[root@12d4-dl585-04 hadoop-1.2.1]#
[root@12d4-dl585-04 hadoop-1.2.1]#
[root@12d4-dl585-04 hadoop-1.2.1]#
[root@12d4-dl585-04 hadoop-1.2.1]# hadoop dfs -ls /home/hadoop/MapReduce/op
Found 3 items
-rw-r–r– 2 root supergroup 0 2014-07-09 10:45 /home/hadoop/MapReduce/op/_SUCCESS
drwxr-xr-x – root supergroup 0 2014-07-09 10:45 /home/hadoop/MapReduce/op/_logs
-rw-r–r– 2 root supergroup 55 2014-07-09 10:45 /home/hadoop/MapReduce/op/part-00000
[root@12d4-dl585-04 hadoop-1.2.1]#

[root@12d4-dl585-04 hadoop-1.2.1]# hadoop dfs -cat /home/hadoop/MapReduce/op/part-00000
Cluster 3
Hadoop 3
MapReduce 3
Multi-node 3
job 3
on 3
[root@12d4-dl585-04 hadoop-1.2.1]#

  • Ask Question