MariaDBとSparkSQLの連携
次に、SparkSQLからMariaDBへの接続と、データの取得についてコードを交えながら見ていければと思います。
補足
本連載ではMariaDBのインストールや設定などについては扱いません。
商品の購買ログの補完を行う
ここではECサイトでの購買ログの補完処理を題材としてみます。
MariaDBに格納されたユーザーマスタとCSVの購買ログを使って、購買ログに性別の補完を行い、商品の性別ごとの売り上げ個数を出力してみます。
データベースの準備
まずは今回利用するデータの準備を行っていきます。
DBからのデータ取得処理を行うので、まずMariaDB側のテーブル作成とデータ作成を行います。前回と同様にデータ量について少量に抑えていますが、大規模なデータセットを扱う場合も基本的な考え方は同様です。
下記は、でき上がるテーブルのイメージです。
user_id | name | gender |
---|---|---|
1 | 鈴木 | F |
2 | 佐藤 | M |
3 | 田中 | F |
4 | 山田 | M |
下記は今回利用するSQL文です。sample_ecの名称でデータベースを作成し、mst_userテーブルを作成しています。
##データベース sample_ecの作成 create database if not exists sample_ec character set utf8 collate utf8_unicode_ci; ##sample_ecの選択 use sample_ec; ##Table mst_userの作成 create table mst_user( user_id int primary key auto_increment, name varchar(20) not null, gender char(1) not null ); ##今回利用するサンプルデータの追加 insert into mst_user values(null,"鈴木",'F'),(null,"佐藤",'M'),(null,"田中",'F'),(null,"山田",'M');
購入ログの準備
次にCSVの購買ログを作成します。
通常、購買ログには購入日時などの情報がついていますが、サンプルということで購入したユーザーID、購入した商品ID、購入した個数の3つのフィールドからなるCSVを作成しました。下記のデータを「sample_purchase.csv」の名称でファイルに保存します。
user_id,product_id,quantity 1,101,1 1,103,1 2,101,3 2,104,1 2,102,1 3,102,1 3,103,1 4,101,2 4,104,1
インタラクティブシェルの起動
次に環境の準備を行います。この処理はJDBCを利用してSparkからMariaDBに接続するため、MariaDBのコネクタが必要になります。
また、Downloadサイトから取得した「mariadb-java-client-x.x.x.jar」ファイルをインタラクティブシェルの起動時に指定する必要があります。下記のように、参照可能なMariaDBのコネクタを指定し、spark-shellを起動しています。
SPARK_CLASSPATH=mariadb-java-client-1.4.4.jar spark-shell
PySparkの場合も同様にSPARK_CLASSPATHを指定することで利用可能です。
SPARK_CLASSPATH=mariadb-java-client-1.4.4.jar pyspark
以下は.jarを指定した場合のコンソールです。赤枠で表示した部分で指定したコネクタが読み込まれていることが見て取れます。
補足
読み込みが正常に行われない場合は、SPARK_CLASSPATHに指定したPathにコネクタが存在するかどうか、指定したPathがspark-shellを起動するユーザーで参照可能かどうかを確認してください。
分析処理
ここからは実際の分析処理を以下の手順に沿って行っていきます。
- MariaDBへの接続先情報の作成
- JDBCでの接続とデータのロード
- CSVファイルのスキーマの宣言
- CSVファイルの読み込み
- CSVファイルのデータフレーム変換
- データフレームのtempテーブル登録
- 分析処理用のSQLの発行
の順で処理を行っていきます。
下記は分析処理のコードになります。
val url = "jdbc:mysql://localhost:3306/sample_ec?user=*****&password=**********" //1. 接続先URLの作成 val table = "mst_user" //1. 接続先のTable val mst_user_df = sqlContext.read.format("jdbc").options(Map("url" -> url,"dbtable" -> table)).load //2.JDBCでの接続とデータのロード case class Purchase(user_id:Int, product_id:Int, quantity:Int) //3.CSVファイルのスキーマ宣言 val raw = sc.textFile("./sample_purchase.csv")// 4.CSVファイルの読み込み val head = raw.first() //5.CSVファイルのデータフレーム変換 val purchase = raw.filter( _ != head).map( _.split(",") ).map( x => Purchase(x(0).toInt, x(1).toInt, x(2).toInt)).toDF().cache //5.CSVファイルのデータフレーム変換 purchase.registerTempTable("purchase") //6.各データフレームのtempテーブル登録 mst_user_df.registerTempTable("mst_user") //6.各データフレームのtempテーブル登録 sqlContext.sql("select product_id,gender,sum(quantity) as summary from purchase as p inner join mst_user as m on p.user_id = m.user_id group by product_id,gender").show //7.分析処理用のSQLの発行
以下にPythonでのコードを示します。
from pyspark.sql import Row url = "jdbc:mysql://localhost:3306/sample_ec?user=root&password=Kuro09201015" #1.接続先URLの作成 table = "mst_user" #1.接続先Table mst_user_df = sqlContext.read.format("jdbc").options(url=url, dbtable=table).load() #2.JDBCでの接続とデータのロード raw = sc.textFile("./sample_purchase.csv") #4.CSVファイルの読み込み head = raw.first() tran = raw.filter(lambda x: x != head).map(lambda x: x.split(",") ).map(lambda r: Row(user_id=int(r[0]), product_id=int(r[1]), quantity=int(r[2]))) #5.CSVファイルのデータフレーム変換 purchase = sqlContext.createDataFrame(tran).cache()#5.CSVファイルのデータフレーム変換 purchase.registerTempTable("purchase") #6.各データフレームのtempテーブル登録 mst_user_df.registerTempTable("mst_user") #6.各データフレームのtempテーブル登録 sqlContext.sql("select product_id,gender,sum(quantity) as summary from purchase as p inner join mst_user as m on p.user_id = m.user_id group by product_id,gender").show() #7.分析処理用のSQLの発行
補足
MariaDBの接続に使うユーザーとパスワードは、ご自身の環境のものに置き換えてください。
以下は処理の実行結果を表示したものです。
このように、SparkSQLを利用することでSQL構文を利用した柔軟な処理を行うことが可能となります。