SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

「Cloudera Quick Start VM」を活用したビッグデータ分析の学習環境構築

フリーのビッグデータ分析学習環境を使って、Apache Sparkによるバスケット分析を行ってみる

「Cloudera Quick Start VM」を活用したビッグデータ分析の学習環境構築 第3回

  • このエントリーをはてなブックマークに追加

 前回の記事では、Impalaを使って、構造化データ(RDBMSからインポートしたデータ)の分析と、Hiveを使用して非構造化データ(アクセスログ)の分析を行いました。今回は、Apache Sparkを使って簡単なバスケット分析を行ってみましょう。

  • このエントリーをはてなブックマークに追加

 Apache Sparkは近年注目されている高速な分散処理エンジンで、ScalaやPythonなどを使って簡単にコードを記述できるということでも注目されています。

チュートリアル4:Apache Sparkを使ってバスケット分析を行う

 第1回の「チュートリアル1」でインポートした注文テーブルを元に、「一緒に購入される頻度の高い商品の組み合わせ」を算出してみます。

 まず、Spark Shellを起動します。

$ spark-shell --jars /usr/lib/avro/avro-mapred.jar \
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer

 起動すると、しばらくログが流れた後、以下のようなプロンプトが表示されます。表示されない場合はリターンキーを押してみてください。

scala>

 これで、Spark Shellを起動できました。

 まず、入力データを登録します。以下のコマンドを実行してみてください。

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable

val warehouse = "hdfs://quickstart.cloudera/user/hive/warehouse/"

val order_items_path = warehouse + "order_items"
val order_items = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](order_items_path)

val products_path = warehouse + "products"
val products = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](products_path)

 実行すると、以下のような出力が表示されます。

5/15 04:43:36 INFO MemoryStore: ensureFreeSpace(259846) called with
curMem=0, maxMem=278302556
15/05/15 04:43:36 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 253.8 KB, free 265.2 MB)
15/05/15 04:43:36 INFO MemoryStore: ensureFreeSpace(21134) called with
curMem=259846, maxMem=278302556
15/05/15 04:43:36 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 20.6 KB, free 265.1 MB)
15/05/15 04:43:36 INFO BlockManagerInfo: Added broadcast_0_piece0 in
memory on 192.168.100.126:37760 <http://192.168.100.126:37760> (size: 20.6
KB, free: 265.4 MB)
15/05/15 04:43:36 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/05/15 04:43:36 INFO SparkContext: Created broadcast 0 from hadoopFile
at <console>:19
15/05/15 04:43:36 INFO MemoryStore: ensureFreeSpace(259894) called with
curMem=280980, maxMem=278302556
15/05/15 04:43:36 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 253.8 KB, free 264.9 MB)
15/05/15 04:43:36 INFO MemoryStore: ensureFreeSpace(21134) called with
curMem=540874, maxMem=278302556
15/05/15 04:43:36 INFO MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 20.6 KB, free 264.9 MB)
15/05/15 04:43:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in
memory on 192.168.100.126:37760 <http://192.168.100.126:37760> (size: 20.6
KB, free: 265.4 MB)
15/05/15 04:43:36 INFO BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/05/15 04:43:36 INFO SparkContext: Created broadcast 1 from hadoopFile
at <console>:22
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
warehouse: String = hdfs://quickstart.cloudera/user/hive/warehouse/
order_items_path: String =
hdfs://quickstart.cloudera/user/hive/warehouse/order_items
order_items: 
org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroWrapper[org.apache.avr
o.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] =
hdfs://quickstart.cloudera/user/hive/warehouse/order_items HadoopRDD[0] at
hadoopFile at <console>:19
products_path: String =
hdfs://quickstart.cloudera/user/hive/warehouse/products
products: 
org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroWrapper[org.apache.avr
o.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] =
hdfs://quicks...

 次に、注文テーブルと商品テーブルを元に、注文単位でグルーピングした商品とその数量のリストを取得します。

val orders = order_items.map { x => (
   x._1.datum.get("order_item_product_id"),
   (x._1.datum.get("order_item_order_id"), x._1.datum.get("order_item_quantity")))
}.join(
 products.map { x => (
   x._1.datum.get("product_id"),
   (x._1.datum.get("product_name")))
 }
).map(x => (
   scala.Int.unbox(x._2._1._1), // order_id
   (
       scala.Int.unbox(x._2._1._2), // quantity
       x._2._2.toString // product_name
   )
)).groupByKey()

 実行すると、以下のような出力が表示されます。

15/05/15 04:45:11 INFO FileInputFormat: Total input paths to process : 1
15/05/15 04:45:11 INFO FileInputFormat: Total input paths to process : 1
orders: org.apache.spark.rdd.RDD[(Int, Iterable[(Int, String)])] =
ShuffledRDD[8] at groupByKey at <console>:39

 この段階ではまだ分散処理は実行されていません。

 最後に、同一注文内でのそれらの商品の組み合わせから、組み合わせの頻度を算出し、高頻度の組み合わせ上位10種を出力します。

val cooccurrences = orders.map(order =>
 (
   order._1,
   order._2.toList.combinations(2).map(order_pair =>
       (
           if (order_pair(0)._2 < order_pair(1)._2) (order_pair(0)._2, order_pair(1)._2) else (order_pair(1)._2, order_pair(0)._2),
           order_pair(0)._1 * order_pair(1)._1
       )
   )
 )
)
val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b)
val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10)

println(mostCommon.deep.mkString("\n"))

 実行すると、以下のような出力が表示されます。長いので省略しています。

15/05/15 04:47:00 INFO SparkContext: Starting job: sortByKey at
<console>:39
15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 3 (map at
<console>:29)
15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 2 (map at
<console>:25)
15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 7 (map at
<console>:33)
15/05/15 04:47:00 INFO DAGScheduler: Registering RDD 10 (flatMap at
<console>:38)
15/05/15 04:47:00 INFO DAGScheduler: Got job 0 (sortByKey at <console>:39)
with 2 output partitions (allowLocal=false)
15/05/15 04:47:00 INFO DAGScheduler: Final stage: Stage 4(sortByKey at
<console>:39)
(中略)
15/05/15 04:47:10 INFO DAGScheduler: Stage 10 (take at <console>:39)
finished in 0.131 s
15/05/15 04:47:10 INFO DAGScheduler: Job 1 finished: take at <console>:39,
took 0.383846 s
(67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip
Deck))
(62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck))
(54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life
Vest))
(39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip
Deck))
(39314,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck))
(35092,(Perfect Fitness Perfect Rip Deck,Under Armour Girls' Toddler Spine
Surge Runni))
(33750,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Free 5.0+ Running
Shoe))
(33406,(Nike Men's Free 5.0+ Running Shoe,O'Brien Men's Neoprene Life
Vest))
(29835,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory
Golf Polo))
(29342,(Nike Men's Dri-FIT Victory Golf Polo,Under Armour Girls' Toddler
Spine Surge Runni))
cooccurrences: org.apache.spark.rdd.RDD[(Int, Iterator[((String, String),
Int)])] = MappedRDD[9] at map at <console>:27
combos: org.apache.spark.rdd.RDD[((String, String), Int)] =
ShuffledRDD[11] at reduceByKey at <console>:38
mostCommon: Array[(Int, (String, String))] = Array((67876,(Nike Men's
Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip Deck)),
(62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip
Deck)), (54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's
Neoprene Life Vest)), (39656,(Nike Men's Free 5.0+ Running Shoe,Perfect
Fitness Perfect Rip Deck)), (39314,(Perfect Fitness Perfect Rip
Deck,Perfect Fitness Perfect Rip Deck)), (35092,(Perfect Fitness Perfect
Rip Deck,Under Armour Girls' Toddler Spine Surge Runni)), (33750,(Nike
Men's Dri-FIT Victory ...
15/05/15 04:47:10 INFO TaskSetManager: Finished task 0.0 in stage 10.0
(TID 12) in 123 ms on 192.168.100.126 (1/1)
15/05/15 04:47:10 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose
tasks have all completed, from pool

 この、最後に表示された以下の部分が、最終的な出力結果になります。

(67876,(Nike Men's Dri-FIT Victory Golf Polo,Perfect Fitness Perfect Rip
Deck))
(62924,(O'Brien Men's Neoprene Life Vest,Perfect Fitness Perfect Rip Deck))
(54399,(Nike Men's Dri-FIT Victory Golf Polo,O'Brien Men's Neoprene Life
Vest))
(39656,(Nike Men's Free 5.0+ Running Shoe,Perfect Fitness Perfect Rip
Deck))
(39314,(Perfect Fitness Perfect Rip Deck,Perfect Fitness Perfect Rip Deck))
(35092,(Perfect Fitness Perfect Rip Deck,Under Armour Girls' Toddler Spine
Surge Runni))
(33750,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Free 5.0+ Running
Shoe))
(33406,(Nike Men's Free 5.0+ Running Shoe,O'Brien Men's Neoprene Life
Vest))
(29835,(Nike Men's Dri-FIT Victory Golf Polo,Nike Men's Dri-FIT Victory
Golf Polo))
(29342,(Nike Men's Dri-FIT Victory Golf Polo,Under Armour Girls' Toddler
Spine Surge Runni))

 この結果から、「Nike Men's Dri-FIT Victory Golf Polo」という商品と「Perfect Fitness Perfect Rip Deck」という商品の組み合わせが最も多い、つまり最も頻繁に組み合わせて購入されるということがわかりました。

 JavaのMapReduceに慣れている人は、あまりに簡素にコードを書けることに驚くでしょう。

 逆に、分散環境でない普通のプログラミングに慣れている人は、今まで自分が書き慣れていたコードと何ら変わらないことに気づくでしょう。

 今回はコマンドラインのシェルからコードを入力しましたが、Hueの新しいバージョンではSpark用の新しいツールが導入される予定です。

 今回のチュートリアルでは扱っていませんが、Apache Sparkは蓄積したデータの処理だけではなく、ストリーミング処理を行うことができます。例えば数秒ごとにログデータを取り込んで処理をする、という用途に利用することもできます。

 次回の最終回では、Cloudera Searchを使ってサーバログの全文ログ検索を行います。また、ボーナスステージとして、Hueで検索結果のダッシュボードを作成し、全文ログの検索結果をさまざまなグラフを使って可視化します。お楽しみに!

この記事は参考になりましたか?

  • このエントリーをはてなブックマークに追加
「Cloudera Quick Start VM」を活用したビッグデータ分析の学習環境構築連載記事一覧

もっと読む

この記事の著者

嶋内 翔(Cloudera株式会社)(シマウチ ショウ)

2011年、Clouderaの最初の日本人社員として入社。サポートエンジニアとして3年務めた後、セールスエンジニアとして働いている。監訳書に「Apache Sqoop クックブック」。ライフワークで技術系ポッドキャスティング garsue.fm のファシリテーターを務めている。

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/8653 2015/06/12 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング