Hadoop Streaming with Python

Hadoop Streaming
Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows developers to create an run Map/Reduce jobs with any executable or script as the ampper and/or the reducer. For example:

hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar \
-input myInputDir \
-output myOutputDir \
-file /bin/cat -mapper cat \
-file /usr/bin/wc -reducer wc
 
Python - Hadoop Streaming
-------------------------
In this example, we are going to use Python scripts as a mapper and a reducer. Both mapper and reducer are executables that read the input from STDIN (line by line) and emit the output to STDOUT. The utility will create a Map/Reduce job, submit the job th a cluster, and monitor the progress of the job until it completes.
 
mapper.py
---------
mapper.py reads data from STDIN, split it into words and ouput a list of lines mapping words to their count ot STDOUT.
 
#!/usr/bin/env python 
 
import sys 
 
# input comes from STDIN (standard input) 
for line in sys.stdin: 
 # remove leading and trailing whitespace 
 line = line.strip() 
 # split the line into words 
 words = line.split() 
 # increase counters 
 for word in words: 
 # write the results to STDOUT (standard output); 
 # what we output here will be the input for the 
 # Reduce step, i.e. the input for reducer.py 
 # 
 # tab-delimited; the trivial word count is 1 
 print '%s\t%s' % (word, 1) 
 
reducer.py
----------
reducer.py reads the output of mapper.py from STDIN and sum the occrurrences of each word to a final count, and then outut its results to STDOUT.
#!/usr/bin/env python 
 
from operator import itemgetter 
import sys 
 
current_word = None 
current_count = 0 
word = None 
 
# input comes from STDIN 
for line in sys.stdin: 
 # remove leading and trailing whitespace 
 line = line.strip() 
 
 # parse the input we got from mapper.py 
 word, count = line.split('\t', 1) 
 
 # convert count (currently a string) to int 
 try: 
 count = int(count) 
 except ValueError: 
 # count was not a number, so silently 
 # ignore/discard this line 
 continue 
 
 # this IF-switch only works because Hadoop sorts map output 
 # by key (here: word) before it is passed to the reducer 
 if current_word == word: 
 current_count += count 
 else: 
 if current_word: 
 # write result to STDOUT 
 print '%s\t%s' % (current_word, current_count) 
 current_count = count 
 current_word = word 
 
# do not forget to output the last word if needed! 
if current_word == word: 
 print '%s\t%s' % (current_word, current_count) 
 
Execute Hadoop Streaming
------------------------
Ingest input data into HDFS (Find William_Shakespeare.txt in the attachment)
[root@myhostname wordcount]$ hadoop fs -mkdir ./streaming/python/data/input
 No encryption was performed by peer.
 No encryption was performed by peer.
[root@myhostname wordcount]$ hadoop fs -put William_Shakespeare.txt ./streaming/python/data/input/
 No encryption was performed by peer.
 No encryption was performed by peer.
 No encryption was performed by peer.
 No encryption was performed by peer.
[root@myhostname wordcount]$ hadoop fs -ls ./streaming/python/data/input/
 No encryption was performed by peer.
 No encryption was performed by peer.
Found 1 items
-rw-r--r-- 3 root bdegrp 5465130 2014-05-15 08:05 streaming/python/data/input/William_Shakespeare.txt
Place Map/Reduce script files into a proxy server.
[root@myhostname wordcount]$ ls -l
total 716
-rw-r--r-- 1 root bdegrp 553 May 14 15:36 mapper.py
-rw-r--r-- 1 root bdegrp 1064 May 14 15:36 reducer.py
Execute the script files using Hadoop streaming utility.
[root@myhostname wordcount]$ hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar \
> -input ./streaming/python/data/input/William_Shakespeare.txt \
> -output ./streaming/python/data/output/wordcount-WS \
> -file mapper.py -mapper 'python mapper.py' \
> -file reducer.py -reducer 'python reducer.py'
You can monitor the job status on console.
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-root/hadoop-unjar8811999904063140339/] [] /tmp/streamjob7664366555498220865.jar tmpDir=null
 No encryption was performed by peer.
 No encryption was performed by peer.
 No encryption was performed by peer.
14/05/15 08:31:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/05/15 08:31:36 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 103944 for root on ha-hdfs:FajitaDevelopment
14/05/15 08:31:36 INFO security.TokenCache: Got dt for hdfs://FajitaDevelopment; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:FajitaDevelopment, Ident: (HDFS_DELEGATION_TOKEN token 103944 for root)
14/05/15 08:31:36 INFO mapred.FileInputFormat: Total input paths to process : 1
14/05/15 08:31:36 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-root/mapred/local]
14/05/15 08:31:36 INFO streaming.StreamJob: Running job: job_201404271309_4731
14/05/15 08:31:36 INFO streaming.StreamJob: To kill this job, run:
14/05/15 08:31:36 INFO streaming.StreamJob: UNDEF/bin/hadoop job -Dmapred.job.tracker=logicaljt -kill job_201404271309_4731
14/05/15 08:31:36 INFO streaming.StreamJob: Tracking URL: http://bdgtmaster03h1d.nam.nsroot.net:50030/jobdetails.jsp?jobid=job_201404271309_4731
14/05/15 08:31:37 INFO streaming.StreamJob: map 0% reduce 0%
14/05/15 08:31:51 INFO streaming.StreamJob: map 100% reduce 0%
14/05/15 08:31:58 INFO streaming.StreamJob: map 100% reduce 8%
14/05/15 08:31:59 INFO streaming.StreamJob: map 100% reduce 9%
14/05/15 08:32:00 INFO streaming.StreamJob: map 100% reduce 34%
14/05/15 08:32:01 INFO streaming.StreamJob: map 100% reduce 35%
14/05/15 08:32:02 INFO streaming.StreamJob: map 100% reduce 38%
14/05/15 08:32:03 INFO streaming.StreamJob: map 100% reduce 40%
14/05/15 08:32:04 INFO streaming.StreamJob: map 100% reduce 63%
14/05/15 08:32:05 INFO streaming.StreamJob: map 100% reduce 67%
14/05/15 08:32:07 INFO streaming.StreamJob: map 100% reduce 72%
14/05/15 08:32:08 INFO streaming.StreamJob: map 100% reduce 78%
14/05/15 08:32:09 INFO streaming.StreamJob: map 100% reduce 98%
14/05/15 08:32:10 INFO streaming.StreamJob: map 100% reduce 100%
14/05/15 08:32:12 INFO streaming.StreamJob: Job complete: job_201404271309_4731
14/05/15 08:32:12 INFO streaming.StreamJob: Output: ./streaming/python/data/output/wordcount-WS
Download output data from HDFS to local file system
[root@myhostname wordcount]$ hadoop fs -getmerge ./streaming/python/data/output/wordcount-WS wordcount-WS.txt
 No encryption was performed by peer.
 No encryption was performed by peer.
[root@myhostname wordcount]$ ls -l
total 716
-rw-r--r-- 1 root bdegrp 553 May 14 15:36 mapper.py
-rw-r--r-- 1 root bdegrp 1064 May 14 15:36 reducer.py
-rwxr-xr-x 1 root bdegrp 721004 May 15 08:35 wordcount-WS.txt
Content of output
[root@myhostname wordcount]$ head -10 wordcount-WS.txt
"Defect" 1
'Dear 3
'Do't. 1
'One 3
'Puppet!' 1
'Vive 1
'Whiles 1
'mildly.' 1
'no' 3
'past 1

  • Ask Question