マイクロサービスアーキテクチャを採用
hacciはGCP上で構築されています。ここからは入力部・処理部・出力部に分けて解説していきます。それぞれは、非常に単純な機能の組み合わせです。先に書いた「運用がしやすいこと」「継続的な改善ができること」を実現するためには、小さな機能に分割してつなぐ(マイクロサービスアーキテクチャ)設計が適切だったからです。
入力部
データパイプラインとしてのhacciにデータを入れるために、以下のGCPコンポーネントを使っています。
Cloud Functionsへの入力
Cloud FunctionsのHTTPトリガーを使い、データ入力のHTTPインタフェースを公開しています。利用者は、入力したいデータをHTTP POSTでこのインタフェースに投げ、hacciの出力部に届くのを待つだけです。
hacciの内部では、HTTPインタフェースにPOSTされたボディの内容を入力データとし、Cloud Pub/Subのメッセージとして蓄積します。
Cloud Storageへの入力
小さなデータであれば、上のCloud FunctionsのHTTPトリガーで受け取れますが、大きめのデータをバッチ的に入力しようとすると、HTTPインタフェースは親切ではありません。そして、こうした大きめのデータというのは、すでにファイルとしてストレージ上に保存されている場合がほとんどです。
例えば「サーバーマシン上の、あるディレクトリ以下のファイルを全部入力として渡したい」などの要件に答えるためには、よりストレージ To ストレージなインタフェースが好ましいでしょう。そのために、Cloud Storageのバケットへの書き込みインタフェースを利用者に提供しています。
hacci内部では、Cloud FunctionsのCloud Storageトリガーを使い、バケットへデータが書き込まれたことを検知・そのデータの内容を取得し、Cloud Pub/Subのメッセージとして蓄積します。
Cloud Pub/Subへの入力
上で取り上げた、Cloud Functionsのインタフェース・Cloud Functionsのインタフェースはともに、その入力内容はCloud Pub/Subへ送られています。このCloud Pub/Subは、データ入力用にも公開しているデータハブです。
Cloud Pub/Subに入るメッセージは、hacciの出力に出てくるべき具体的なデータの内容です。利用者向けから入力部として見えるCloud FunctionsやCloud Storageも、どうすれば利用者がhacciにデータを楽に送れるかを考え、インタフェースの選択肢として提供するべく公開しているものです。
処理部
入力部から入ってきて、データハブとしてのCloud Pub/Subに集約されたデータは、処理部にあるCloud Dataflow上に置かれたストリーミングデータ処理アプリケーションにて、定型的な処理が施されます。
Cloud Dataflowの役割
入力部の節では、Cloud Pub/Subに入るメッセージの内容は「具体的なデータの内容」と書きましたが、より詳細に説明すると以下の要素が含まれます。
- データの内容
- データの形式(現在はCSV、TSV、JSON、ProtocolBuffersバイナリをサポート)
- データのスキーマ(ProtocolBuffersスキーマ形式で記述)
- データ入力時タイムスタンプ、データごとにユニークなIDなどのメタデータ
- 出力部にある、どの出力にデータを流すのかの宛先
Cloud Dataflowアプリケーションは、「データの形式」と「データのスキーマ」をデータを読む手段として保持し、「データの内容」を解釈します。CSVデータがきたことを想定した処理例を示します。
data = [データの内容] data_format = [データの形式] data_schema = [データのスキーマ] # element0, element1 という要素名とその要素値の型が定義されている data_object = HacciObject<data_schema> # ProtocolBuffers形式の箱 # data_object.element0, data_object.element1 という変数が型とともに定義される switch (data_format) : case “csv”: input_data = CSVReader(data) data_object.element0 = input_data[0] data_object.element1 = input_data[1]
このように、「データの形式」と「データのスキーマ」さえ明示されれば、どのような形式の「データの内容」が入力部から来てもHacciObjectというhacci内部共通形式にデータを詰め込むことが可能で、この箱に入ったデータを次の処理(出力)に移すことができます。
出力部
Cloud Dataflowアプリケーションによってhacci内部共通形式に詰め込まれたデータは、いよいよ利用者のために出力されます。hacciでは、以下のコンポーネントを出力先として提供しています。
- Cloud Pub/Sub
- Cloud Bigtable
- BigQuery
Cloud Pub/Subへの出力
Cloud Storage入力ではあまり需要がない出力ですが、Cloud Functions(HTTPトリガー)入力やCloud Pub/Sub入力で入ってきたデータを、ストリーミングデータとして受け取りたいと利用者が考えたとき、この出力インタフェースが採用されます。「利用者がすでにストリーミングデータ処理に慣れている」「Cloud DataflowやSpark Streamingなどでアプリケーションを実装できる能力を有している」といったことが多いようです。
入力と出力のいずれにもCloud Pub/Subを用いた場合、うまみはないのではないかと思われる方もいらっしゃるかもしれませんが、実はhacciの処理部でHacciObjectというスキーマが明示な共通形式となって出力のCloud Pub/Subにメッセージとして流れるため、後続でデータを解釈しやすいという利点があるのです。すなわちデータの種類だけ解釈するためのコードを書く必要はなく、ある程度共通処理としてデータ(=HacciObject)を解釈するコードが書ける、ということです。
Cloud Bigtableへの出力
入力したデータのある要素をキーとし、残りの要素を参照するためのAPIが欲しいという要件より、このCloud Bigtable出力が実装されました。例えば、Cloud Storageに置いたJSONファイルから、各要素キーをCloud Bigtableのカラムとして、各要素値をカラムの値として出力します。Cloud Bigtableを参照しているAppEngineが、HTTPリクエストに含まれる要素キーから、Cloud Bigtableで対応するカラム値、またはそれにひも付く要素群を取り出します。
以上が基本機能ですが、実際はhacciのCloud Pub/Sub出力からCloud Dataflowアプリケーションで処理し、その処理結果をCloud Bigtableに入れて、よりリッチな結果をAppEngineで参照する、といった使われ方が多いのが実態です。
BigQueryへの出力
hacciが入ってくるデータに対してスキーマを意識し続けた理由が、このBigQuery出力にあります。BigQueryは、RDBと同じようにテーブルに対しスキーマを持ち、テーブルを読み書きする際にはこのスキーマを知っていなければいけません。hacciでは、Protocol Buffers形式で利用者に記述してもらったスキーマをBigQueryスキーマに変換するためにGCPの公式Githubで提供されている「protoc-gen-bq-schema」を活用しています。これは、ProtocolBuffers形式で書かれたスキーマをプログラムで扱うためのコンパイラprotocのプラグインとして動作し、BigQueryスキーマを出力します。
BigQueryはデータサイエンティストにとって、とても便利なツールであるようです。しかし、分析に使うデータは、サーバー上であったり、アクセスが難しいデータベース上であったり、はたまたオープンデータとして乱雑にWeb上に置かれていたり、得てして「そのままBigQueryに投入するには難しい場所」にあるものです。そういったデータサイエンティストが感じる「難しさ」を解消するためにも、hacciのデータパイプラインは貢献しています。
まとめ
世の中が変化し、リアルタイムな行動を捉えた「おもてなし」が求められている
- ユーザー行動が短時間化(マイクロモーメント)
- 「一見さん」ユーザーに対しては、デイリーバッチ処理では十分におもてなしできない
- 既成品の課題(カスタマイズ、コスト)
上記の背景からデータ基盤「hacci」を内製
- hacciは「リクルートのWebサービスを使うユーザーのデータをリアルタイムに収集・分析・活用するためのプラットフォーム」
hacciの特徴
- GCP上で実装されており、入力部・処理部・出力部に分かれる
- それぞれの機能を単純化しつなぐことで、運用負荷低減、継続的な改善を実現
連載の第2回では、このプラットフォームを使った事例を紹介します。