Hadoop Streaming
Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows developers to create an run Map/Reduce jobs with any executable or script as the ampper and/or the reducer. For example:
hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar \
-input myInputDir \
-output myOutputDir \
-file /bin/cat -mapper cat \
-file /usr/bin/wc -reducer wc
Python - Hadoop Streaming
-------------------------
In this example, we are going to use Python scripts as a mapper and a reducer. Both mapper and reducer are executables that read the input from STDIN (line by line) and emit the output to STDOUT. The utility will create a Map/Reduce job, submit the job th a cluster, and monitor the progress of the job until it completes.
mapper.py
---------
mapper.py reads data from STDIN, split it into words and ouput a list of lines mapping words to their count ot STDOUT.
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
reducer.py
----------
reducer.py reads the output of mapper.py from STDIN and sum the occrurrences of each word to a final count, and then outut its results to STDOUT.
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Execute Hadoop Streaming
------------------------
Ingest input data into HDFS (Find William_Shakespeare.txt in the attachment)
[root@myhostname wordcount]$ hadoop fs -mkdir ./streaming/python/data/input
No encryption was performed by peer.
No encryption was performed by peer.
[root@myhostname wordcount]$ hadoop fs -put William_Shakespeare.txt ./streaming/python/data/input/
No encryption was performed by peer.
No encryption was performed by peer.
No encryption was performed by peer.
No encryption was performed by peer.
[root@myhostname wordcount]$ hadoop fs -ls ./streaming/python/data/input/
No encryption was performed by peer.
No encryption was performed by peer.
Found 1 items
-rw-r--r-- 3 root bdegrp 5465130 2014-05-15 08:05 streaming/python/data/input/William_Shakespeare.txt
Place Map/Reduce script files into a proxy server.
[root@myhostname wordcount]$ ls -l
total 716
-rw-r--r-- 1 root bdegrp 553 May 14 15:36 mapper.py
-rw-r--r-- 1 root bdegrp 1064 May 14 15:36 reducer.py
Execute the script files using Hadoop streaming utility.
[root@myhostname wordcount]$ hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar \
> -input ./streaming/python/data/input/William_Shakespeare.txt \
> -output ./streaming/python/data/output/wordcount-WS \
> -file mapper.py -mapper 'python mapper.py' \
> -file reducer.py -reducer 'python reducer.py'
You can monitor the job status on console.
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-root/hadoop-unjar8811999904063140339/] [] /tmp/streamjob7664366555498220865.jar tmpDir=null
No encryption was performed by peer.
No encryption was performed by peer.
No encryption was performed by peer.
14/05/15 08:31:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/05/15 08:31:36 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 103944 for root on ha-hdfs:FajitaDevelopment
14/05/15 08:31:36 INFO security.TokenCache: Got dt for hdfs://FajitaDevelopment; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:FajitaDevelopment, Ident: (HDFS_DELEGATION_TOKEN token 103944 for root)
14/05/15 08:31:36 INFO mapred.FileInputFormat: Total input paths to process : 1
14/05/15 08:31:36 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-root/mapred/local]
14/05/15 08:31:36 INFO streaming.StreamJob: Running job: job_201404271309_4731
14/05/15 08:31:36 INFO streaming.StreamJob: To kill this job, run:
14/05/15 08:31:36 INFO streaming.StreamJob: UNDEF/bin/hadoop job -Dmapred.job.tracker=logicaljt -kill job_201404271309_4731
14/05/15 08:31:36 INFO streaming.StreamJob: Tracking URL: http://bdgtmaster03h1d.nam.nsroot.net:50030/jobdetails.jsp?jobid=job_201404271309_4731
14/05/15 08:31:37 INFO streaming.StreamJob: map 0% reduce 0%
14/05/15 08:31:51 INFO streaming.StreamJob: map 100% reduce 0%
14/05/15 08:31:58 INFO streaming.StreamJob: map 100% reduce 8%
14/05/15 08:31:59 INFO streaming.StreamJob: map 100% reduce 9%
14/05/15 08:32:00 INFO streaming.StreamJob: map 100% reduce 34%
14/05/15 08:32:01 INFO streaming.StreamJob: map 100% reduce 35%
14/05/15 08:32:02 INFO streaming.StreamJob: map 100% reduce 38%
14/05/15 08:32:03 INFO streaming.StreamJob: map 100% reduce 40%
14/05/15 08:32:04 INFO streaming.StreamJob: map 100% reduce 63%
14/05/15 08:32:05 INFO streaming.StreamJob: map 100% reduce 67%
14/05/15 08:32:07 INFO streaming.StreamJob: map 100% reduce 72%
14/05/15 08:32:08 INFO streaming.StreamJob: map 100% reduce 78%
14/05/15 08:32:09 INFO streaming.StreamJob: map 100% reduce 98%
14/05/15 08:32:10 INFO streaming.StreamJob: map 100% reduce 100%
14/05/15 08:32:12 INFO streaming.StreamJob: Job complete: job_201404271309_4731
14/05/15 08:32:12 INFO streaming.StreamJob: Output: ./streaming/python/data/output/wordcount-WS
Download output data from HDFS to local file system
[root@myhostname wordcount]$ hadoop fs -getmerge ./streaming/python/data/output/wordcount-WS wordcount-WS.txt
No encryption was performed by peer.
No encryption was performed by peer.
[root@myhostname wordcount]$ ls -l
total 716
-rw-r--r-- 1 root bdegrp 553 May 14 15:36 mapper.py
-rw-r--r-- 1 root bdegrp 1064 May 14 15:36 reducer.py
-rwxr-xr-x 1 root bdegrp 721004 May 15 08:35 wordcount-WS.txt
Content of output
[root@myhostname wordcount]$ head -10 wordcount-WS.txt
"Defect" 1
'Dear 3
'Do't. 1
'One 3
'Puppet!' 1
'Vive 1
'Whiles 1
'mildly.' 1
'no' 3
'past 1
Ask Question
[…] Hadoop Streaming Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows developers to create an run Map/Reduce jobs with any executable or script as the ampper and/or the reducer. For example: hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.5.0.jar … […]