SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

ゼロからはじめるSparkアプリケーション入門

Sparkの主な拡張コンポーネントの紹介と、SparkSQLを利用した簡単な分析

ゼロからはじめるSparkアプリケーション入門(2)


  • X ポスト
  • このエントリーをはてなブックマークに追加

複雑な構造のファイルとSparkSQLでのマッピング

 SparkSQLでは、Hiveと同様にMap型やArray型のカラムを宣言できます。SparkSQLでも、これらの型を利用することで、特にログファイルなどの構造化されていないデータも柔軟に対応することができます。ここでは広告の表示ログを想定して、以下の形式のデータ解析をテーマにArray型での解析を行います。

 以下は広告ログのサンプルです。

 CSVのフィールドにさらに「:」区切りで広告IDが格納されています。

広告が表示されたPageID(page_id) 表示された広告ID(ad_ids)
1 2:1:3:5
2 3:1:2:8
3 3:1:5:8
4 1:2:8:10:5
5 3:1:8:3
6 3:5
CSV
page_id,ad_ids
1,2:1:3:5
2,3:1:2:8
3,3:1:5:8
4,1:2:8:10:5
5,3:1:8:3
6,3:5

 このファイルを「ad_sample_log.csv」の名称でローカルに保存します。以後、この情報をもとに、ページごとの広告の表示回数と広告ごとの表示回数を計算していきます。

ファイルのロードとテンポラリテーブルの登録

 まずは、今回のログのスキーマを作成しています。ログはInt型のpage_idとIntの配列(Seq[Int])型として宣言しておきます。

Scala
case class AdLog(page_id:Int, ad_ids:Seq[Int]) //ログデータのスキーマ

 次にCSVファイルを読み込み、ヘッダ行以外をAdLogの形のデータフレームに変換します。前述のCSVの読み込みと違い、「:」で分割したデータをSeq[Int]型にしています。

Scala
val df = ad_log.filter( _ != ad_log_head).map( _.split(",")).map(x => AdLog(x(0).toInt,x(1).split(":").map(_.toInt).toSeq)).toDF().cache

 以下はPythonでのコードです。

Python
hiveContext = HiveContext(sc)
tran = ad_log.filter(lambda x: x != ad_log_head).map(lambda x: x.split(",")).map(lambda r: Row(page_id=int(r[0]), ad_ids=map(int, r[1].split(":"))))
df = hiveContext.createDataFrame(tran).cache()

補足

 Python側のSQLContextでは後ほど利用するlateral viewがエラーとなるため、HiveContextを使ってHive側のParserを利用するようにしています。

 作成したデータフレームをAdLogという名称のテンポラリテーブルとして登録します。

Scala
df.registerTempTable("AdLog")
Python
df.registerTempTable("AdLog")

 これでSQLを利用する準備が整いました。ここで一度、定義されたテーブルを見てみましょう。

Scala
sqlContext.sql("select * from AdLog").show

 以下はコンソール画面のキャプチャです。

img5.png
Python
hiveContext.sql("select * from AdLog").show()

補足

 ad_idsのフィールドが見慣れない表記になっています。

 AdLogの宣言時にSeqを利用することで配列でのデータの表現が可能となり、可変長のフィールドを表現することができます。この配列のフィールドは添字を使うことでSQL内からアクセス可能です。

 以下はad_idsの0番目が3のデータを抽出するSQLです。

Scala
sqlContext.sql("select * from AdLog where ad_ids[0] = 3").show

 以下はコンソール画面のキャプチャです。

img6.png

 Pythonの場合も同様です。

Python
hiveContext.sql("select * from AdLog where ad_ids[0] = 3").show()

配列の展開

 取り込んだデータの配列をSQLのLateral viewを使って展開します。

Scala
sqlContext.sql("select * from AdLog lateral view explode(ad_ids) ex_adids as ad_id ").show

 以下はコンソール画面のキャプチャです。

img7.png

 イメージがつきやすいようにad_idsのカラムを残したままなので非常に見辛いですが、 page_idに対してad_idsカラムを展開し、ad_idというカラムを作っています。

 同様のPythonコードは以下になります。

Python
hiveContext.sql("select * from AdLog LATERAL view explode(ad_ids) ex_adids as ad_id ").show()

 以下はSQLでの展開イメージになります。

展開前
page_id ad_ids
1 2,1,3,5
展開後
page_id ad_id
1 2
1 1
1 3
1 5

 lateral viewでの宣言は通常通り利用可能なので、以下のようにselect句を変更し、必要なカラムのみに絞ります。

Scala
sqlContext.sql("select page_id,ad_id from AdLog lateral view explode(ad_ids) ex_adids as ad_id ").show
img8.png

 以下はPythonのコードです。

Python
hiveContext.sql("select page_id,ad_id from AdLog lateral view explode(ad_ids) ex_adids as ad_id ").show()

 あとはpage_idごとのcountと、ad_idごとのcountを取得するだけですね。通常集計するのと同様にgroup byでそれぞれ集計します。

 以下はpage_idごとの広告の表示回数の集計です。

Scala
sqlContext.sql("select page_id,count(ad_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by page_id").show
img9.png

 Pythonでも集計してみます。

Python
hiveContext.sql("select page_id,count(ad_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by page_id").show()

 以下はad_idごとの広告の表示回数の集計です。

Scala
sqlContext.sql("select ad_id,count(page_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by ad_id").show
img10.png

 以下は同様のPythonコードです。

Python
hiveContext.sql("select ad_id,count(page_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by ad_id").show()

 ここまでSparkSQL(HiveSQL)での配列型データをlateral viewを使用して簡単な分析を行ってみました。

 Hadoop/Spark基盤をデータレイクとして機能させるために、こうした複雑なデータ形式を整形し、中間データとして保存することで、他のコンポーネントでの利用を可能にするETL処理はとても重要です。

【PR】

次のページ
MariaDBとSparkSQLの連携

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
ゼロからはじめるSparkアプリケーション入門連載記事一覧
この記事の著者

田中 裕一(日本アイ・ビー・エム株式会社)(タナカ ユウイチ)

Web系・広告系企業にて、Hadoop/Spark/Kafka等Hadoopエコシステムを利用した広告システム(DMP)・行動分析基盤・レコメンド基盤の全体アーキテクチャ設計やプログラミング、最適化、行動解析を担当。Spark/Hadoopエコシステムを筆頭にOSSを組み合わせた大規模なアーキテクチ...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/9457 2016/07/04 13:56

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング