Hadoop Streaming

Hadoop Streaming
----------------
Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. For example:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar -input myInputDirs -output myOutputDir -mapper /bin/cat -reducer /bin/wc
How Streaming Works
-------------------
In the above example, both the mapper and the reducer are executables that read the input from stdin (line by line) and emit the output to stdout. The utility will 
create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.
When an executable is specified for mappers, each mapper task will launch the executable as a separate process when the mapper is initialized. As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) will be the value. If there is no tab character in the line, then entire line is considered as key and the value is null.
Streaming Options can be find as below.
[root@dbversity.com]# hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -info
Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
 -input <path> DFS input file(s) for the Map step.
 -output <path> DFS output directory for the Reduce step.
 -mapper <cmd|JavaClassName> Optional. Command to be run as mapper.
 -combiner <cmd|JavaClassName> Optional. Command to be run as combiner.
 -reducer <cmd|JavaClassName> Optional. Command to be run as reducer.
 -file <file> Optional. File/dir to be shipped in the Job jar file.
 Deprecated. Use generic option "-files" instead.
 -inputformat <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>
 Optional. The input format class.
 -outputformat <TextOutputFormat(default)|JavaClassName>
 Optional. The output format class.
 -partitioner <JavaClassName> Optional. The partitioner class.
 -numReduceTasks <num> Optional. Number of reduce tasks.
 -inputreader <spec> Optional. Input recordreader spec.
 -cmdenv <n>=<v> Optional. Pass env.var to streaming commands.
 -mapdebug <cmd> Optional. To run this script when a map task fails.
 -reducedebug <cmd> Optional. To run this script when a reduce task fails.
 -io <identifier> Optional. Format to use for input to and output
 from mapper/reducer commands
 -lazyOutput Optional. Lazily create Output.
 -background Optional. Submit the job and don't wait till it completes.
 -verbose Optional. Print verbose output.
 -info Optional. Print detailed usage.
 -help Optional. Print help message.
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|jobtracker:port> specify a job tracker
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
Usage tips:
In -input: globbing on <path> is supported and can have multiple -input
Default Map input format: a line is a record in UTF-8 the key part ends at first
 TAB, the rest of the line is the value
To pass a Custom input format:
 -inputformat package.MyInputFormat
Similarly, to pass a custom output format:
 -outputformat package.MyOutputFormat
The files with extensions .class and .jar/.zip, specified for the -file
 argument[s], end up in "classes" and "lib" directories respectively inside
 the working directory when the mapper and reducer are run. All other files
 specified for the -file argument[s] end up in the working directory when the
 mapper and reducer are run. The location of this working directory is
 unspecified.
To set the number of reduce tasks (num. of output files) as, say 10:
 Use -numReduceTasks 10
To skip the sort/combine/shuffle/sort/reduce step:
 Use -numReduceTasks 0
 Map output then becomes a 'side-effect output' rather than a reduce input.
 This speeds up processing. This also feels more like "in-place" processing
 because the input filename and the map input order are preserved.
 This is equivalent to -reducer NONE
To speed up the last maps:
 -D mapreduce.map.speculative=true
To speed up the last reduces:
 -D mapreduce.reduce.speculative=true
To name the job (appears in the JobTracker Web UI):
 -D mapreduce.job.name='My Job'
To change the local temp directory:
 -D dfs.data.dir=/tmp/dfs
 -D stream.tmpdir=/tmp/streaming
Additional local temp directories with -jt local:
 -D mapreduce.cluster.local.dir=/tmp/local
 -D mapreduce.jobtracker.system.dir=/tmp/system
 -D mapreduce.cluster.temp.dir=/tmp/temp
To treat tasks with non-zero exit status as SUCCEDED:
 -D stream.non.zero.exit.is.failure=false
Use a custom hadoop streaming build along with standard hadoop install:
 $HADOOP_PREFIX/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\
 [...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar
For more details about jobconf parameters see:
 http://wiki.apache.org/hadoop/JobConfFile
To set an environement variable in a streaming command:
 -cmdenv EXAMPLE_DIR=/home/example/dictionaries/
Shortcut:
 setenv HSTREAMING "$HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar"
Example: $HSTREAMING -mapper "/usr/local/bin/perl5 filter.pl"
 -file /local/filter.pl -input "/logs/0604*/*" [...]
 Ships a script, invokes the non-shipped perl interpreter. Shipped files go to
 the working directory so filter.pl is found by perl. Input files are all the
 daily logs for days in month 2006-04
[root@dbversity.com]#
Illustration :-
--------------
[root@dbversity.com]# mongo --port 27010
MongoDB shell version: 2.4.11
connecting to: 127.0.0.1:27010/test
Server has startup warnings: 
Fri Nov 7 08:48:49.460 [initandlisten] 
Fri Nov 7 08:48:49.460 [initandlisten] ** WARNING: You are running on a NUMA machine.
Fri Nov 7 08:48:49.460 [initandlisten] ** We suggest launching mongod like this to avoid performance problems:
Fri Nov 7 08:48:49.460 [initandlisten] ** numactl --interleave=all mongod [other options]
Fri Nov 7 08:48:49.460 [initandlisten] 
> show collections
system.indexes
ufo
> 
> 
bye
[root@dbversity.com]# ll -lhtr /opt/mongodb/ufo_07Nov14/test/
total 20K
-rw-r--r-- 1 root root 63 Nov 7 10:35 system.indexes.bson
-rw-r--r-- 1 root root 90 Nov 7 10:35 ufo.metadata.json
-rw-r--r-- 1 root root 12K Nov 7 10:35 ufo.bson
[root@dbversity.com]# 
[root@dbversity.com]# hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -Dmongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat -Dmongo.job.output.fo
rmat=com.mongodb.hadoop.MongoOutputFormat -input /opt/mongodb/ufo_07Nov14/test/ -output /opt/mongodb/ufo_07Nov14/ -numReduceTasks 0
14/11/07 16:18:56 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id
14/11/07 16:18:56 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/11/07 16:18:56 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/11/07 16:18:56 INFO mapred.JobClient: Cleaning up the staging area file:/tmp/hadoop-root/mapred/staging/root753420450/.staging/job_local753420450_0001
14/11/07 16:18:56 ERROR security.UserGroupInformation: PriviledgedActionException as:root (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/opt/mongodb/ufo_07Nov14 already exists
14/11/07 16:18:56 ERROR streaming.StreamJob: Error Launching job : Output directory file:/opt/mongodb/ufo_07Nov14 already exists
Streaming Command Failed!
[root@dbversity.com]# 
[root@dbversity.com]# 
[root@dbversity.com]# hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -Dmongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat -Dmongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat -input /opt/mongodb/ufo_07Nov14/test/ -output /opt/mongodb/ufo_07Nov14/newdb -numReduceTasks 0
14/11/07 16:19:20 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id
14/11/07 16:19:20 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/11/07 16:19:20 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/11/07 16:19:20 INFO mapred.FileInputFormat: Total input paths to process : 3
14/11/07 16:19:21 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/11/07 16:19:21 INFO mapred.JobClient: Running job: job_local28405872_0001
14/11/07 16:19:21 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Waiting for map tasks
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Starting task: attempt_local28405872_0001_m_000000_0
14/11/07 16:19:21 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
14/11/07 16:19:21 INFO util.ProcessTree: setsid exited with exit code 0
14/11/07 16:19:21 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@3035c7dd
14/11/07 16:19:21 INFO mapred.MapTask: Processing split: file:/opt/mongodb/ufo_07Nov14/test/ufo.bson:0+12032
14/11/07 16:19:21 WARN mapreduce.Counters: Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead
14/11/07 16:19:21 INFO mapred.MapTask: numReduceTasks: 0
14/11/07 16:19:21 INFO mapred.Task: Task:attempt_local28405872_0001_m_000000_0 is done. And is in the process of commiting
14/11/07 16:19:21 INFO mapred.LocalJobRunner: 
14/11/07 16:19:21 INFO mapred.Task: Task attempt_local28405872_0001_m_000000_0 is allowed to commit now
14/11/07 16:19:21 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local28405872_0001_m_000000_0' to file:/opt/mongodb/ufo_07Nov14/newdb
14/11/07 16:19:21 INFO mapred.LocalJobRunner: file:/opt/mongodb/ufo_07Nov14/test/ufo.bson:0+12032
14/11/07 16:19:21 INFO mapred.Task: Task 'attempt_local28405872_0001_m_000000_0' done.
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local28405872_0001_m_000000_0
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Starting task: attempt_local28405872_0001_m_000001_0
14/11/07 16:19:21 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
14/11/07 16:19:21 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@70b3c565
14/11/07 16:19:21 INFO mapred.MapTask: Processing split: file:/opt/mongodb/ufo_07Nov14/test/ufo.metadata.json:0+90
14/11/07 16:19:21 WARN mapreduce.Counters: Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead
14/11/07 16:19:21 INFO mapred.MapTask: numReduceTasks: 0
14/11/07 16:19:21 INFO mapred.Task: Task:attempt_local28405872_0001_m_000001_0 is done. And is in the process of commiting
14/11/07 16:19:21 INFO mapred.LocalJobRunner: 
14/11/07 16:19:21 INFO mapred.Task: Task attempt_local28405872_0001_m_000001_0 is allowed to commit now
14/11/07 16:19:21 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local28405872_0001_m_000001_0' to file:/opt/mongodb/ufo_07Nov14/newdb
14/11/07 16:19:21 INFO mapred.LocalJobRunner: file:/opt/mongodb/ufo_07Nov14/test/ufo.metadata.json:0+90
14/11/07 16:19:21 INFO mapred.Task: Task 'attempt_local28405872_0001_m_000001_0' done.
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local28405872_0001_m_000001_0
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Starting task: attempt_local28405872_0001_m_000002_0
14/11/07 16:19:21 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
14/11/07 16:19:21 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4fb3a794
14/11/07 16:19:21 INFO mapred.MapTask: Processing split: file:/opt/mongodb/ufo_07Nov14/test/system.indexes.bson:0+63
14/11/07 16:19:21 WARN mapreduce.Counters: Counter name MAP_INPUT_BYTES is deprecated. Use FileInputFormatCounters as group name and BYTES_READ as counter name instead
14/11/07 16:19:21 INFO mapred.MapTask: numReduceTasks: 0
14/11/07 16:19:21 INFO mapred.Task: Task:attempt_local28405872_0001_m_000002_0 is done. And is in the process of commiting
14/11/07 16:19:21 INFO mapred.LocalJobRunner: 
14/11/07 16:19:21 INFO mapred.Task: Task attempt_local28405872_0001_m_000002_0 is allowed to commit now
14/11/07 16:19:21 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local28405872_0001_m_000002_0' to file:/opt/mongodb/ufo_07Nov14/newdb
14/11/07 16:19:21 INFO mapred.LocalJobRunner: file:/opt/mongodb/ufo_07Nov14/test/system.indexes.bson:0+63
14/11/07 16:19:21 INFO mapred.Task: Task 'attempt_local28405872_0001_m_000002_0' done.
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local28405872_0001_m_000002_0
14/11/07 16:19:21 INFO mapred.LocalJobRunner: Map task executor complete.
14/11/07 16:19:22 INFO mapred.JobClient: map 100% reduce 0%
14/11/07 16:19:22 INFO mapred.JobClient: Job complete: job_local28405872_0001
14/11/07 16:19:22 INFO mapred.JobClient: Counters: 14
14/11/07 16:19:22 INFO mapred.JobClient: File System Counters
14/11/07 16:19:22 INFO mapred.JobClient: FILE: Number of bytes read=345588
14/11/07 16:19:22 INFO mapred.JobClient: FILE: Number of bytes written=616611
14/11/07 16:19:22 INFO mapred.JobClient: FILE: Number of read operations=0
14/11/07 16:19:22 INFO mapred.JobClient: FILE: Number of large read operations=0
14/11/07 16:19:22 INFO mapred.JobClient: FILE: Number of write operations=0
14/11/07 16:19:22 INFO mapred.JobClient: Map-Reduce Framework
14/11/07 16:19:22 INFO mapred.JobClient: Map input records=3
14/11/07 16:19:22 INFO mapred.JobClient: Map output records=3
14/11/07 16:19:22 INFO mapred.JobClient: Input split bytes=308
14/11/07 16:19:22 INFO mapred.JobClient: Spilled Records=0
14/11/07 16:19:22 INFO mapred.JobClient: CPU time spent (ms)=0
14/11/07 16:19:22 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
14/11/07 16:19:22 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
14/11/07 16:19:22 INFO mapred.JobClient: Total committed heap usage (bytes)=850919424
14/11/07 16:19:22 INFO mapred.JobClient: org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
14/11/07 16:19:22 INFO mapred.JobClient: BYTES_READ=12185
14/11/07 16:19:22 INFO streaming.StreamJob: Output directory: /opt/mongodb/ufo_07Nov14/newdb
[root@dbversity.com]# 
[root@dbversity.com]# 
[root@dbversity.com]# ll -lhtr /opt/mongodb/ufo_07Nov14/newdb/
total 20K
-rwxr-xr-x 1 root root 12K Nov 7 16:19 part-00000
-rwxr-xr-x 1 root root 93 Nov 7 16:19 part-00001
-rwxr-xr-x 1 root root 66 Nov 7 16:19 part-00002
-rwxr-xr-x 1 root root 0 Nov 7 16:19 _SUCCESS
[root@dbversity.com]#
  • Ask Question