複雑な構造のファイルとSparkSQLでのマッピング
SparkSQLでは、Hiveと同様にMap型やArray型のカラムを宣言できます。SparkSQLでも、これらの型を利用することで、特にログファイルなどの構造化されていないデータも柔軟に対応することができます。ここでは広告の表示ログを想定して、以下の形式のデータ解析をテーマにArray型での解析を行います。
以下は広告ログのサンプルです。
CSVのフィールドにさらに「:」区切りで広告IDが格納されています。
広告が表示されたPageID(page_id) | 表示された広告ID(ad_ids) |
---|---|
1 | 2:1:3:5 |
2 | 3:1:2:8 |
3 | 3:1:5:8 |
4 | 1:2:8:10:5 |
5 | 3:1:8:3 |
6 | 3:5 |
page_id,ad_ids 1,2:1:3:5 2,3:1:2:8 3,3:1:5:8 4,1:2:8:10:5 5,3:1:8:3 6,3:5
このファイルを「ad_sample_log.csv」の名称でローカルに保存します。以後、この情報をもとに、ページごとの広告の表示回数と広告ごとの表示回数を計算していきます。
ファイルのロードとテンポラリテーブルの登録
まずは、今回のログのスキーマを作成しています。ログはInt型のpage_idとIntの配列(Seq[Int])型として宣言しておきます。
case class AdLog(page_id:Int, ad_ids:Seq[Int]) //ログデータのスキーマ
次にCSVファイルを読み込み、ヘッダ行以外をAdLogの形のデータフレームに変換します。前述のCSVの読み込みと違い、「:」で分割したデータをSeq[Int]型にしています。
val df = ad_log.filter( _ != ad_log_head).map( _.split(",")).map(x => AdLog(x(0).toInt,x(1).split(":").map(_.toInt).toSeq)).toDF().cache
以下はPythonでのコードです。
hiveContext = HiveContext(sc) tran = ad_log.filter(lambda x: x != ad_log_head).map(lambda x: x.split(",")).map(lambda r: Row(page_id=int(r[0]), ad_ids=map(int, r[1].split(":")))) df = hiveContext.createDataFrame(tran).cache()
補足
Python側のSQLContextでは後ほど利用するlateral viewがエラーとなるため、HiveContextを使ってHive側のParserを利用するようにしています。
作成したデータフレームをAdLogという名称のテンポラリテーブルとして登録します。
df.registerTempTable("AdLog")
df.registerTempTable("AdLog")
これでSQLを利用する準備が整いました。ここで一度、定義されたテーブルを見てみましょう。
sqlContext.sql("select * from AdLog").show
以下はコンソール画面のキャプチャです。
hiveContext.sql("select * from AdLog").show()
補足
ad_idsのフィールドが見慣れない表記になっています。
AdLogの宣言時にSeqを利用することで配列でのデータの表現が可能となり、可変長のフィールドを表現することができます。この配列のフィールドは添字を使うことでSQL内からアクセス可能です。
以下はad_idsの0番目が3のデータを抽出するSQLです。
sqlContext.sql("select * from AdLog where ad_ids[0] = 3").show
以下はコンソール画面のキャプチャです。
Pythonの場合も同様です。
hiveContext.sql("select * from AdLog where ad_ids[0] = 3").show()
配列の展開
取り込んだデータの配列をSQLのLateral viewを使って展開します。
sqlContext.sql("select * from AdLog lateral view explode(ad_ids) ex_adids as ad_id ").show
以下はコンソール画面のキャプチャです。
イメージがつきやすいようにad_idsのカラムを残したままなので非常に見辛いですが、 page_idに対してad_idsカラムを展開し、ad_idというカラムを作っています。
同様のPythonコードは以下になります。
hiveContext.sql("select * from AdLog LATERAL view explode(ad_ids) ex_adids as ad_id ").show()
以下はSQLでの展開イメージになります。
page_id | ad_ids |
---|---|
1 | 2,1,3,5 |
page_id | ad_id |
---|---|
1 | 2 |
1 | 1 |
1 | 3 |
1 | 5 |
lateral viewでの宣言は通常通り利用可能なので、以下のようにselect句を変更し、必要なカラムのみに絞ります。
sqlContext.sql("select page_id,ad_id from AdLog lateral view explode(ad_ids) ex_adids as ad_id ").show
以下はPythonのコードです。
hiveContext.sql("select page_id,ad_id from AdLog lateral view explode(ad_ids) ex_adids as ad_id ").show()
あとはpage_idごとのcountと、ad_idごとのcountを取得するだけですね。通常集計するのと同様にgroup byでそれぞれ集計します。
以下はpage_idごとの広告の表示回数の集計です。
sqlContext.sql("select page_id,count(ad_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by page_id").show
Pythonでも集計してみます。
hiveContext.sql("select page_id,count(ad_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by page_id").show()
以下はad_idごとの広告の表示回数の集計です。
sqlContext.sql("select ad_id,count(page_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by ad_id").show
以下は同様のPythonコードです。
hiveContext.sql("select ad_id,count(page_id) from AdLog lateral view explode(ad_ids) ex_adids as ad_id group by ad_id").show()
ここまでSparkSQL(HiveSQL)での配列型データをlateral viewを使用して簡単な分析を行ってみました。
Hadoop/Spark基盤をデータレイクとして機能させるために、こうした複雑なデータ形式を整形し、中間データとして保存することで、他のコンポーネントでの利用を可能にするETL処理はとても重要です。