SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

ゼロからはじめるSparkアプリケーション入門

Sparkの主な拡張コンポーネントの紹介と、SparkSQLを利用した簡単な分析

ゼロからはじめるSparkアプリケーション入門(2)


  • X ポスト
  • このエントリーをはてなブックマークに追加

MariaDBとSparkSQLの連携

 次に、SparkSQLからMariaDBへの接続と、データの取得についてコードを交えながら見ていければと思います。

補足

 本連載ではMariaDBのインストールや設定などについては扱いません。

商品の購買ログの補完を行う

 ここではECサイトでの購買ログの補完処理を題材としてみます。

 MariaDBに格納されたユーザーマスタとCSVの購買ログを使って、購買ログに性別の補完を行い、商品の性別ごとの売り上げ個数を出力してみます。

データベースの準備

 まずは今回利用するデータの準備を行っていきます。

 DBからのデータ取得処理を行うので、まずMariaDB側のテーブル作成とデータ作成を行います。前回と同様にデータ量について少量に抑えていますが、大規模なデータセットを扱う場合も基本的な考え方は同様です。

 下記は、でき上がるテーブルのイメージです。

user_id name gender
1 鈴木 F
2 佐藤 M
3 田中 F
4 山田 M

 下記は今回利用するSQL文です。sample_ecの名称でデータベースを作成し、mst_userテーブルを作成しています。

SQL
##データベース sample_ecの作成
create database if not exists sample_ec character set utf8 collate utf8_unicode_ci;

##sample_ecの選択
use sample_ec;

##Table mst_userの作成
create table mst_user(
  user_id int primary key auto_increment,
  name varchar(20) not null,
  gender char(1) not null
  );

##今回利用するサンプルデータの追加
insert into mst_user values(null,"鈴木",'F'),(null,"佐藤",'M'),(null,"田中",'F'),(null,"山田",'M');

購入ログの準備

 次にCSVの購買ログを作成します。

 通常、購買ログには購入日時などの情報がついていますが、サンプルということで購入したユーザーID、購入した商品ID、購入した個数の3つのフィールドからなるCSVを作成しました。下記のデータを「sample_purchase.csv」の名称でファイルに保存します。

CSV:sample_purchase.csv
user_id,product_id,quantity
1,101,1
1,103,1
2,101,3
2,104,1
2,102,1
3,102,1
3,103,1
4,101,2
4,104,1

インタラクティブシェルの起動

 次に環境の準備を行います。この処理はJDBCを利用してSparkからMariaDBに接続するため、MariaDBのコネクタが必要になります。

 また、Downloadサイトから取得した「mariadb-java-client-x.x.x.jar」ファイルをインタラクティブシェルの起動時に指定する必要があります。下記のように、参照可能なMariaDBのコネクタを指定し、spark-shellを起動しています。

bash
SPARK_CLASSPATH=mariadb-java-client-1.4.4.jar spark-shell

 PySparkの場合も同様にSPARK_CLASSPATHを指定することで利用可能です。

bash
SPARK_CLASSPATH=mariadb-java-client-1.4.4.jar pyspark

 以下は.jarを指定した場合のコンソールです。赤枠で表示した部分で指定したコネクタが読み込まれていることが見て取れます。

img11.png

補足

 読み込みが正常に行われない場合は、SPARK_CLASSPATHに指定したPathにコネクタが存在するかどうか、指定したPathがspark-shellを起動するユーザーで参照可能かどうかを確認してください。

分析処理

 ここからは実際の分析処理を以下の手順に沿って行っていきます。

  1. MariaDBへの接続先情報の作成
  2. JDBCでの接続とデータのロード
  3. CSVファイルのスキーマの宣言
  4. CSVファイルの読み込み
  5. CSVファイルのデータフレーム変換
  6. データフレームのtempテーブル登録
  7. 分析処理用のSQLの発行

の順で処理を行っていきます。

 下記は分析処理のコードになります。

Scala
val url = "jdbc:mysql://localhost:3306/sample_ec?user=*****&password=**********" //1. 接続先URLの作成
val table = "mst_user" //1. 接続先のTable
val mst_user_df = sqlContext.read.format("jdbc").options(Map("url" -> url,"dbtable" -> table)).load //2.JDBCでの接続とデータのロード

case class Purchase(user_id:Int, product_id:Int, quantity:Int) //3.CSVファイルのスキーマ宣言

val raw = sc.textFile("./sample_purchase.csv")// 4.CSVファイルの読み込み
val head = raw.first() //5.CSVファイルのデータフレーム変換
val purchase = raw.filter( _ != head).map( _.split(",") ).map( x => Purchase(x(0).toInt, x(1).toInt, x(2).toInt)).toDF().cache //5.CSVファイルのデータフレーム変換
purchase.registerTempTable("purchase") //6.各データフレームのtempテーブル登録
mst_user_df.registerTempTable("mst_user") //6.各データフレームのtempテーブル登録
sqlContext.sql("select product_id,gender,sum(quantity) as summary from purchase as p inner join mst_user as m on p.user_id = m.user_id group by product_id,gender").show //7.分析処理用のSQLの発行

 以下にPythonでのコードを示します。

Python
from pyspark.sql import Row
url = "jdbc:mysql://localhost:3306/sample_ec?user=root&password=Kuro09201015" #1.接続先URLの作成
table = "mst_user" #1.接続先Table
mst_user_df = sqlContext.read.format("jdbc").options(url=url, dbtable=table).load() #2.JDBCでの接続とデータのロード

raw = sc.textFile("./sample_purchase.csv") #4.CSVファイルの読み込み
head = raw.first()
tran  = raw.filter(lambda x: x != head).map(lambda x: x.split(",") ).map(lambda r: Row(user_id=int(r[0]), product_id=int(r[1]), quantity=int(r[2]))) #5.CSVファイルのデータフレーム変換
purchase = sqlContext.createDataFrame(tran).cache()#5.CSVファイルのデータフレーム変換
purchase.registerTempTable("purchase") #6.各データフレームのtempテーブル登録
mst_user_df.registerTempTable("mst_user") #6.各データフレームのtempテーブル登録
sqlContext.sql("select product_id,gender,sum(quantity) as summary from purchase as p inner join mst_user as m on p.user_id = m.user_id group by product_id,gender").show() #7.分析処理用のSQLの発行

補足

 MariaDBの接続に使うユーザーとパスワードは、ご自身の環境のものに置き換えてください。

 以下は処理の実行結果を表示したものです。

img12.png

 このように、SparkSQLを利用することでSQL構文を利用した柔軟な処理を行うことが可能となります。

【PR】

次のページ
キャッシュと遅延評価

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
ゼロからはじめるSparkアプリケーション入門連載記事一覧
この記事の著者

田中 裕一(日本アイ・ビー・エム株式会社)(タナカ ユウイチ)

Web系・広告系企業にて、Hadoop/Spark/Kafka等Hadoopエコシステムを利用した広告システム(DMP)・行動分析基盤・レコメンド基盤の全体アーキテクチャ設計やプログラミング、最適化、行動解析を担当。Spark/Hadoopエコシステムを筆頭にOSSを組み合わせた大規模なアーキテクチ...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/9457 2016/07/04 13:56

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング