Hadoop

"Hadoop is a Free Java software framework that supports distributed applications running on large clusters of commodity computers that process huge amounts of data." -- wikipedia

Hadoop is a distributed parallel computing framework. Hadoop implements MapReduce, using the Hadoop Distributed File System (HDFS). Google has its own implementation of MapReduce. Many big companies (Google, Yahoo, Ask.com, Amazon, PowerSet... not sure about MS) are using MapReduce to process large scale of data. The data could be as large as the whole web. An application is building inverted web index.

It's better to start with this classic Map/Reduce paper.

Here is a diagram from the paper. All your input files will be on a hadoop distributed file system (HDFS). Each mapper runs on a small part of input, for each record(you can define it), generate a key/value pair. All the pairs with the same key will be transmitted to one reducer, so you can do your job with all the pairs all together, and generate several key/value pairs as final output. All the mappers and reducers can run independently, hence parallelly. Although finish of a reducer depends on finish of all mappers. If you scroll down you'll find sample java code to count word frequency.

Hadoop Streaming

"Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run map/reduce jobs with any executable or script as the mapper and/or the reducer..."

So you don't have to write Java code to start enjoying the power of Hadoop. You can use "grep abc" as mapper and "cat" as reducer to do greping with hadoop.

Here is an example to count Word Frequency:

In mapper, replace all empty character with enter, so each word will be in one line: " perl -pi -e 's/\s+/\n/g' "
In reducer, all the same word will be in one reducer, so just count the frequency with "uniq -c".
It's similar to run "cat filename | perl -p -e 's/\s+/\n/g' | sort | uniq -c " on a single machine.
The sample hadoop command line will be:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper "perl -p -e 's/\s+/\n/g'" \
    -reducer "uniq -c"

Hadoop Streaming Tutorial

Useful Links


Sample Java Code To Count Word Frequency

Basically in this example, you only need to customize MapClass and Reduce. Normally no need to modify main().

/**
* This is an example Hadoop Map/Reduce application.
* It reads the text input files, breaks each line into words
* and counts them. The output is a locally sorted list of words and the 
* count of how often they occurred.
*
* To run: bin/hadoop jar build/hadoop-examples.jar wordcount
*            [-m maps] [-r reduces] in-dir out-dir 
*
* @author Owen O'Malley
*/
public class WordCount {
        
        /**
        * Counts the words in each line.
        * For each line of input, break the line into words and emit them as
        * (word, 1).
        * This is what you need to customize.
        */

        public static class MapClass extends MapReduceBase implements Mapper {
                
                private final static IntWritable one = new IntWritable(1);
                private Text word = new Text();
                
                public void map(WritableComparable key, Writable value, 
                OutputCollector output, 
                Reporter reporter) throws IOException {
                        String line = ((Text)value).toString();
                        StringTokenizer itr = new StringTokenizer(line);
                        while (itr.hasMoreTokens()) {
                                word.set(itr.nextToken());
                                output.collect(word, one);
                        }
                }
        }
        
        /**
        * A reducer class that just emits the sum of the input values.
        * This is what you need to customize.
        */

        public static class Reduce extends MapReduceBase implements Reducer {
                
                public void reduce(WritableComparable key, Iterator values,
                OutputCollector output, 
                Reporter reporter) throws IOException {
                        int sum = 0;
                        while (values.hasNext()) {
                                sum += ((IntWritable) values.next()).get();
                        }
                        output.collect(key, new IntWritable(sum));
                }
        }
        
        static void printUsage() {
                System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
                System.exit(1);
        }
        
        /**
        * The main driver for word count map/reduce program.
        * Invoke this method to submit the map/reduce job.
        * @throws IOException When there is communication problems with the 
        *                     job tracker.
        */

        public static void main(String[] args) throws IOException {
                JobConf conf = new JobConf(WordCount.class);
                conf.setJobName("wordcount");
                
                //    This is what you need to customize.

                // the keys are words (strings)
                conf.setOutputKeyClass(Text.class); 
                // the values are counts (ints)
                conf.setOutputValueClass(IntWritable.class);
                
                conf.setMapperClass(MapClass.class);        
                conf.setCombinerClass(Reduce.class);
                conf.setReducerClass(Reduce.class);
                //   Normally do not need to modify below this line.
                
                List other_args = new ArrayList();
                for(int i=0; i < args.length; ++i) {
                        try {
                                if ("-m".equals(args[i])) {
                                        conf.setNumMapTasks(Integer.parseInt(args[++i]));
                                } else if ("-r".equals(args[i])) {
                                        conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                                } else {
                                        other_args.add(args[i]);
                                }
                        } catch (NumberFormatException except) {
                                System.out.println("ERROR: Integer expected instead of " + args[i]);
                                printUsage();
                        } catch (ArrayIndexOutOfBoundsException except) {
                                System.out.println("ERROR: Required parameter missing from " +
                                args[i-1]);
                                printUsage(); // exits

                        }
                }
                // Make sure there are exactly 2 parameters left.
                if (other_args.size() != 2) {
                        System.out.println("ERROR: Wrong number of parameters: " +
                        other_args.size() + " instead of 2.");
                        printUsage();
                }
                conf.setInputPath(new Path((String) other_args.get(0)));
                conf.setOutputPath(new Path((String) other_args.get(1)));
                
                // Uncomment to run locally in a single process

                // conf.set("mapred.job.tracker", "local");
                
                JobClient.runJob(conf);
        }
        
}

Home