SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

ゼロからはじめるSparkアプリケーション入門

Sparkの主な拡張コンポーネントの紹介と、SparkSQLを利用した簡単な分析

ゼロからはじめるSparkアプリケーション入門(2)


  • X ポスト
  • このエントリーをはてなブックマークに追加

SparkSQLでのプログラミング

 それでは、早速SparkSQLでのプログラミングを行っていきます。ここではインタラクティブシェルを用いて、CSVファイルへのSQL操作とMariaDBのTable操作を行います。

補足

 プログラミングの習得にフォーカスするため、今回HiveMetaStoreを利用したHiveQLの説明は行いません。

csvファイルの読み込みとSQLでの操作

 連載の前回で利用したsample_transaction.csvファイルを利用して、簡単にSparkSQLに触れてみます。まずはSparkでSQLを使うため、テンポラリテーブルとして"transaction"という名前のテーブルを宣言します。

  1. 読み込むCSVファイルのスキーマの宣言
  2. CSVファイルの読み込み
  3. CSVからデータフレームへの変換
  4. sqlContextへのテンポラリテーブルの登録
Scala
case class transaction(id:Int,transaction_id:String,name:String,quantity:Int,price:Int) // 1.スキーマの宣言
val raw = sc.textFile("./sample_transaction.csv") // 2.csvの読み込み
val head = raw.first()
val tran = raw.filter( _ != head).map( _.split(",") ).map(x => transaction(x(0).toInt,x(1).toString,x(2).toString,x(3).toInt,x(4).toInt)).toDF().cache() // 3.データフレームに変換
tran.registerTempTable("transaction") // 4.テンポラリテーブルとして登録

 以下はPySparkの場合です。基本的な流れは同様ですが、Scalaと比べてスキーマの宣言が緩やかです。

Python
from pyspark.sql import SQLContext, Row #ライブラリのインポート
raw = sc.textFile("./sample_transaction.csv") #2.csvの読み込み
head = raw.first()
tran = raw.filter(lambda x: x != head).map(lambda r: r.split(",")).map(lambda r: Row(id=int(r[0]), transaction_id=r[1], name=r[2], quantity=int(r[3]), price=int(r[4])))
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(tran) #3.データフレームに変換
df.registerTempTable("transaction") #4.テンポラリテーブルとして登録

 宣言したテーブルに対して、まずは簡単なSQLをScala、Pythonで発行してみます。

Scala
sqlContext.sql("select * from transaction ").show
Python
sqlContext.sql("select * from transaction").show()

 transactionテーブルのデータをすべて表示するSQLです。SQLの結果に対し、show関数を発行することでコンソール上にテーブルの内容が表示されます。

 以下はコンソール画面のキャプチャです。

img2.png

 今回のケースではレコード数が8件と少ないため、すべてのレコードを表示可能ですが、BigDataを扱う場合には件数が膨大なため、表示も注意が必要です。show関数は表示する件数を渡すことが可能なので、テーブルの内容を何件か見たい場合などに利用しましょう。

 以下は、レコードを3件取得する処理とコンソール画面のキャプチャです。

Scala
sqlContext.sql("select * from transaction ").show(3)
img3.png

 Pythonの場合も同様です。

Python
sqlContext.sql("select * from transaction").show(3)

トランザクションごとの購入価格のサマリー

 連載の前回で、RDDに対するreduce処理を利用したトランザクションごとの購入価格のサマリーについて、SparkSQLを使って処理してみます。

Scala
sqlContext.sql("select transaction_id,sum(quantity * price) as summary from transaction  group by transaction_id").show

 以下はコンソール画面のキャプチャです。

img4.png

 SQLが処理の主体なため、Pythonの場合もほとんど記述は変わりません。

Python
sqlContext.sql("select transaction_id,sum(quantity * price) as summary from transaction  group by transaction_id").show()

 日頃使い慣れたSQLでデータ処理が行えるのは便利ですね。

【PR】

次のページ
複雑な構造のファイルとSparkSQLでのマッピング

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
ゼロからはじめるSparkアプリケーション入門連載記事一覧
この記事の著者

田中 裕一(日本アイ・ビー・エム株式会社)(タナカ ユウイチ)

Web系・広告系企業にて、Hadoop/Spark/Kafka等Hadoopエコシステムを利用した広告システム(DMP)・行動分析基盤・レコメンド基盤の全体アーキテクチャ設計やプログラミング、最適化、行動解析を担当。Spark/Hadoopエコシステムを筆頭にOSSを組み合わせた大規模なアーキテクチ...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/9457 2016/07/04 13:56

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング