SparkSQLでのプログラミング
それでは、早速SparkSQLでのプログラミングを行っていきます。ここではインタラクティブシェルを用いて、CSVファイルへのSQL操作とMariaDBのTable操作を行います。
補足
プログラミングの習得にフォーカスするため、今回HiveMetaStoreを利用したHiveQLの説明は行いません。
csvファイルの読み込みとSQLでの操作
連載の前回で利用したsample_transaction.csvファイルを利用して、簡単にSparkSQLに触れてみます。まずはSparkでSQLを使うため、テンポラリテーブルとして"transaction"という名前のテーブルを宣言します。
- 読み込むCSVファイルのスキーマの宣言
- CSVファイルの読み込み
- CSVからデータフレームへの変換
- sqlContextへのテンポラリテーブルの登録
case class transaction(id:Int,transaction_id:String,name:String,quantity:Int,price:Int) // 1.スキーマの宣言 val raw = sc.textFile("./sample_transaction.csv") // 2.csvの読み込み val head = raw.first() val tran = raw.filter( _ != head).map( _.split(",") ).map(x => transaction(x(0).toInt,x(1).toString,x(2).toString,x(3).toInt,x(4).toInt)).toDF().cache() // 3.データフレームに変換 tran.registerTempTable("transaction") // 4.テンポラリテーブルとして登録
以下はPySparkの場合です。基本的な流れは同様ですが、Scalaと比べてスキーマの宣言が緩やかです。
from pyspark.sql import SQLContext, Row #ライブラリのインポート raw = sc.textFile("./sample_transaction.csv") #2.csvの読み込み head = raw.first() tran = raw.filter(lambda x: x != head).map(lambda r: r.split(",")).map(lambda r: Row(id=int(r[0]), transaction_id=r[1], name=r[2], quantity=int(r[3]), price=int(r[4]))) sqlContext = SQLContext(sc) df = sqlContext.createDataFrame(tran) #3.データフレームに変換 df.registerTempTable("transaction") #4.テンポラリテーブルとして登録
宣言したテーブルに対して、まずは簡単なSQLをScala、Pythonで発行してみます。
sqlContext.sql("select * from transaction ").show
sqlContext.sql("select * from transaction").show()
transactionテーブルのデータをすべて表示するSQLです。SQLの結果に対し、show関数を発行することでコンソール上にテーブルの内容が表示されます。
以下はコンソール画面のキャプチャです。
今回のケースではレコード数が8件と少ないため、すべてのレコードを表示可能ですが、BigDataを扱う場合には件数が膨大なため、表示も注意が必要です。show関数は表示する件数を渡すことが可能なので、テーブルの内容を何件か見たい場合などに利用しましょう。
以下は、レコードを3件取得する処理とコンソール画面のキャプチャです。
sqlContext.sql("select * from transaction ").show(3)
Pythonの場合も同様です。
sqlContext.sql("select * from transaction").show(3)
トランザクションごとの購入価格のサマリー
連載の前回で、RDDに対するreduce処理を利用したトランザクションごとの購入価格のサマリーについて、SparkSQLを使って処理してみます。
sqlContext.sql("select transaction_id,sum(quantity * price) as summary from transaction group by transaction_id").show
以下はコンソール画面のキャプチャです。
SQLが処理の主体なため、Pythonの場合もほとんど記述は変わりません。
sqlContext.sql("select transaction_id,sum(quantity * price) as summary from transaction group by transaction_id").show()
日頃使い慣れたSQLでデータ処理が行えるのは便利ですね。