5. MapReduceの説明
次にMapReduceの説明に入ります。MapReduceという名前は、処理が「Mapフェーズ」と「Reduceフェーズ」という2つのフェーズで構成されることに由来しています。
Mapフェーズは元データから情報を抽出する処理、Reduceフェーズは抽出された情報を集約する処理を行います。プログラマはMapフェーズ・Reduceフェーズのロジックを記述するだけで、MapReduceフレームワークが入力の分割やMap・Reduceの分散実行などの面倒を見ます。
MapReduceフレームワークでは[key, val]
のペアが重要になります。MapReduceプログラムに[key, val]
のリストを入力すると、MapフェーズとReduceフェーズを経て、出力として[key', val']
のリストが得られます。
Mapフェーズでは入力の[k1, v1]
のペアに対して何らかの処理を行い、新しく[k2, v2]
というペアを出力します。
Mapフェーズが終了すると、出力されたペアが同じキーごとにまとめられます。つまり[k2, [v2, ...]]
というように、k2
というキーを持つ値がまとめ上げられます。これがReduceフェーズに渡されます。Reduceフェーズではこれを処理し、最終的に[k3, v3]
を出力します。
先ほどサンプルで使用したWordCountプログラムのソースを見るのが分かりやすいので、実際にそのコードを見ていくことにします。上の説明と見比べながら動作を把握してください。ソースコードは src/examples/org/apache/hadoop/examples/WordCount.java にあります。MapClass
、Reduce
がそれぞれMapフェーズ、Reduceフェーズに対応するクラスです。
MapClass
のコードは次のとおりです。
/**
* Counts the words in each line.
* For each line of input, break the line into words and emit them as
* (<b>word</b>, <b>1</b>).
*/ public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { // 単語に分解 word.set(itr.nextToken()); output.collect(word, one); // [word, 1]を出力 } } }
MapReduceBase
クラスを継承し、Mapper
インターフェイスを実装しています。肝となるのはmap
関数です。引数のvalue
には入力ファイルの内容が入っています。これをStringTokenizer
を用いて単語に分割します(変数word
)。そして、[word, 1]
というKey-Valueペアを出力(output.collect
)しています。"hoge hoge hoge fuga fuga"
というテキストを入力に取った場合、次のような5つのKey-Valueペアが出力されます。
[hoge, 1] [hoge, 1] [hoge, 1] [fuga, 1] [fuga, 1]
出力されたペアはReduce
に渡される前に次のようにキーごとにまとめられます。
[hoge, [1, 1, 1]] [fuga, [1, 1]]
次にこれらのペアがReduce
に渡されます。Reduce
のコードは次のとおりです。
/**
* A reducer class that just emits the sum of the input values.
*/ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); // values.next().get()はこの場合必ず1 } output.collect(key, new IntWritable(sum)); // [word, 出現回数]を出力 } }
このクラスの場合、単語とvalueの総和を最後に出力しています。Mapフェーズでvalueには全て1を出力したので、valuesの総和は単語が出現した回数に一致します。この例の場合、次のようなペアが最終的に出力されます。
[hoge, 3] [fuga, 2]
各クラスやインターフェイスの内容はごっそり省きましたが、MapReduceの概要は掴めていただけましたでしょうか。他にも分散grepや分散数独Solverなどのサンプルもありますので、ぜひ動かして遊んでみてください。さらに進んで自分でMapReduceのプログラムを書いてみたいという方は、Hadoop MapReduceのチュートリアルが参考になるでしょう。
チュートリアルの後はAPIやサンプルプログラムのソースコードを参考に、独自のMapReduceプログラムを書いてみてください。
MapReduceが作られた背景
GoogleがMapReduceを作った背景として、今まで分散計算用のプログラムを書いたことがない人でも簡単にWeb規模のデータ処理が行えるようなフレームワークを構築するというのがまず挙げられます。通常、何万台ものマシンを使用して効率的なプログラムを記述するためには、通信の量を削減したりマシンが故障した時にも計算を正しく行えるようにするなど、プログラミングの本質的ではない所にばかり時間が浪費されます。これをうまくフレームワーク側で負担してくれ、かつ分かりやすいプログラミングモデルであったということが、Google社内でMapReduceが使われるようになった一因と思われます。
6. まとめ
1台にHadoopをインストールして、実際にサンプルプログラムを走らせてみました。またHDFSのアーキテクチャとMapReduceについても説明しました。
次回は複数台にHadoopをインストールして動かし、そのスケーラビリティを確認したいと思います。