Apache Sparkは近年注目されている高速な分散処理エンジンで、ScalaやPythonなどを使って簡単にコードを記述できるということでも注目されています。
チュートリアル4:Apache Sparkを使ってバスケット分析を行う
第1回の「チュートリアル1」でインポートした注文テーブルを元に、「一緒に購入される頻度の高い商品の組み合わせ」を算出してみます。
まず、Spark Shellを起動します。
$ spark-shell --jars /usr/lib/avro/avro-mapred.jar \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
起動すると、しばらくログが流れた後、以下のようなプロンプトが表示されます。表示されない場合はリターンキーを押してみてください。
scala>
これで、Spark Shellを起動できました。
まず、入力データを登録します。以下のコマンドを実行してみてください。
import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} import org.apache.hadoop.io.NullWritable val warehouse = "hdfs://quickstart.cloudera/user/hive/warehouse/" val order_items_path = warehouse + "order_items" val order_items = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](order_items_path) val products_path = warehouse + "products" val products = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](products_path)
実行すると、以下のような出力が表示されます。
5/15 04:43:36 INFO MemoryStore: ensureFreeSpace(259846) called with curMem=0, maxMem=278302556 15/05/15 04:43:36 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 253.8 KB, free 265.2 MB) 15/05/15 04:43:36 INFO MemoryStore: ensureFreeSpace(21134) called with curMem=259846, maxMem=278302556 15/05/15 04:43:36 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.6 KB, free 265.1 MB) 15/05/15 04:43:36 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.100.126:37760 <http://192.168.100.126:37760> (size: 20.6 KB, free: 265.4 MB) 15/05/15 04:43:36 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/15 04:43:36 INFO SparkContext: Created broadcast 0 from hadoopFile at <console>:19 15/05/15 04:43:36 INFO MemoryStore: ensureFreeSpace(259894) called with curMem=280980, maxMem=278302556 15/05/15 04:43:36 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 253.8 KB, free 264.9 MB) 15/05/15 04:43:36 INFO MemoryStore: ensureFreeSpace(21134) called with curMem=540874, maxMem=278302556 15/05/15 04:43:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.6 KB, free 264.9 MB) 15/05/15 04:43:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.100.126:37760 <http://192.168.100.126:37760> (size: 20.6 KB, free: 265.4 MB) 15/05/15 04:43:36 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/05/15 04:43:36 INFO SparkContext: Created broadcast 1 from hadoopFile at <console>:22 import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} import org.apache.hadoop.io.NullWritable warehouse: String = hdfs://quickstart.cloudera/user/hive/warehouse/ order_items_path: String = hdfs://quickstart.cloudera/user/hive/warehouse/order_items order_items: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroWrapper[org.apache.avr o.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] = hdfs://quickstart.cloudera/user/hive/warehouse/order_items HadoopRDD[0] at hadoopFile at <console>:19 products_path: String = hdfs://quickstart.cloudera/user/hive/warehouse/products products: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroWrapper[org.apache.avr o.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] = hdfs://quicks...
次に、注文テーブルと商品テーブルを元に、注文単位でグルーピングした商品とその数量のリストを取得します。
val orders = order_items.map { x => ( x._1.datum.get("order_item_product_id"), (x._1.datum.get("order_item_order_id"), x._1.datum.get("order_item_quantity"))) }.join( products.map { x => ( x._1.datum.get("product_id"), (x._1.datum.get("product_name"))) } ).map(x => ( scala.Int.unbox(x._2._1._1), // order_id ( scala.Int.unbox(x._2._1._2), // quantity x._2._2.toString // product_name ) )).groupByKey()
実行すると、以下のような出力が表示されます。
15/05/15 04:45:11 INFO FileInputFormat: Total input paths to process : 1 15/05/15 04:45:11 INFO FileInputFormat: Total input paths to process : 1 orders: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, String)])] = ShuffledRDD[8] at groupByKey at <console>:39
この段階ではまだ分散処理は実行されていません。
最後に、同一注文内でのそれらの商品の組み合わせから、組み合わせの頻度を算出し、高頻度の組み合わせ上位10種を出力します。
val cooccurrences = orders.map(order => ( order._1, order._2.toList.combinations(2).map(order_pair => ( if (order_pair(0)._2 < order_pair(1)._2) (order_pair(0)._2, order_pair(1)._2) else (order_pair(1)._2, order_pair(0)._2), order_pair(0)._1 * order_pair(1)._1 ) ) ) ) val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b) val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10) println(mostCommon.deep.mkString("\n"))
実行すると、以下のような出力が表示されます。長いので省略しています。
15/05/15 04:47:00 INFO SparkContext: Starting job: sortByKey at <console>:39 15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 3 (map at <console>:29) 15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 2 (map at <console>:25) 15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 7 (map at <console>:33) 15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 10 (flatMap at <console>:38) 15/05/15 04:47:00 INFO DAGScheduler: Got job 0 (sortByKey at <console>:39) with 2 output partitions (allowLocal=false) 15/05/15 04:47:00 INFO DAGScheduler: Final stage: Stage 4(sortByKey at <console>:39) (中略) 15/05/15 04:47:10 INFO DAGScheduler: Stage 10 (take at <console>:39) finished in 0.131 s 15/05/15 04:47:10 INFO DAGScheduler: Job 1 finished: take at <console>:39, took 0.383846 s (67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck)) (62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck)) (54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life Vest)) (39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip Deck)) (39314,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck)) (35092,(Perfect Fitness Perfect Rip Deck,Under Armour Girls' Toddler Spine Surge Runni)) (33750,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Free 5.0+ Running Shoe)) (33406,(Nike Men's Free 5.0+ Running Shoe,O'Brien Men's Neoprene Life Vest)) (29835,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory Golf Polo)) (29342,(Nike Men's Dri-FIT Victory Golf Polo,Under Armour Girls' Toddler Spine Surge Runni)) cooccurrences: org.apache.spark.rdd.RDD[(Int, Iterator[((String, String), Int)])] = MappedRDD[9] at map at <console>:27 combos: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[11] at reduceByKey at <console>:38 mostCommon: Array[(Int, (String, String))] = Array((67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck)), (62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck)), (54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life Vest)), (39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip Deck)), (39314,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck)), (35092,(Perfect Fitness Perfect Rip Deck,Under Armour Girls' Toddler Spine Surge Runni)), (33750,(Nike Men's Dri-FIT Victory ... 15/05/15 04:47:10 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 12) in 123 ms on 192.168.100.126 (1/1) 15/05/15 04:47:10 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks have all completed, from pool
この、最後に表示された以下の部分が、最終的な出力結果になります。
(67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck)) (62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck)) (54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life Vest)) (39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip Deck)) (39314,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck)) (35092,(Perfect Fitness Perfect Rip Deck,Under Armour Girls' Toddler Spine Surge Runni)) (33750,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Free 5.0+ Running Shoe)) (33406,(Nike Men's Free 5.0+ Running Shoe,O'Brien Men's Neoprene Life Vest)) (29835,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory Golf Polo)) (29342,(Nike Men's Dri-FIT Victory Golf Polo,Under Armour Girls' Toddler Spine Surge Runni))
この結果から、「Nike Men's Dri-FIT Victory Golf Polo」という商品と「Perfect Fitness Perfect Rip Deck」という商品の組み合わせが最も多い、つまり最も頻繁に組み合わせて購入されるということがわかりました。
JavaのMapReduceに慣れている人は、あまりに簡素にコードを書けることに驚くでしょう。
逆に、分散環境でない普通のプログラミングに慣れている人は、今まで自分が書き慣れていたコードと何ら変わらないことに気づくでしょう。
今回はコマンドラインのシェルからコードを入力しましたが、Hueの新しいバージョンではSpark用の新しいツールが導入される予定です。
今回のチュートリアルでは扱っていませんが、Apache Sparkは蓄積したデータの処理だけではなく、ストリーミング処理を行うことができます。例えば数秒ごとにログデータを取り込んで処理をする、という用途に利用することもできます。
次回の最終回では、Cloudera Searchを使ってサーバログの全文ログ検索を行います。また、ボーナスステージとして、Hueで検索結果のダッシュボードを作成し、全文ログの検索結果をさまざまなグラフを使って可視化します。お楽しみに!