ストリーム処理サービス「Amazon Kinesis」をPythonで使う
PyData Tokyoの中の人、池内(@iktakahiro)です。
今回は数あるAmazon Web Serviceのサービスの中で、アマゾンデータサービスジャパンの榎並利晃さんより「Amazon Kinesis」について解説いただける運びとなりました。
Amazon Kinesisとはどういうものなのか、Pythonで利用するにはどのようにすればよいか、という視点で本稿を綴ります。
Amazon Kinesisとは
Amazon Kinesisは、「ストリーミング処理」のテクノロジーに大別されます。同領域では、他にApache Kafka、Stormなどが著名です。Amazon Kinesisは、ストリーミング処理の機能をオンデマンドかつフルマネージドで提供するサービスで、2013年11月に発表され、大きな注目を集めました。
Amazon Kinesisユースケースとして、以下のような例が挙げられていました。
リアルタイム状況把握
- キャンペーンの状況把握
- POSデータからの売上状況把握
異常検知
- センサーの異常検知
- 不正アクセス検知
サービス向上
- ソーシャルデータを用いたリアルタイムサービス
- 直近の行動を基にしたレコメンデーション
センサーデータの利活用は、特にIoTの分野で強い関心を集めるケースと言えます。
Amazon Kinesisのアーキテクチャ
まずAmazon Kinesisのアーキテクチャを整理しましょう。
- ユーザーはStreamを作成する。Streamは1つ以上のShardで構成される。
- ユーザーは、Shardにデータ(Data Record)をPUTすることができる。Shardは、1MB/sec, 1000TPSの性能でデータを受け入れることができる。
- Shardを増加させることで、スループットを向上できる(スケールアウト)。
- Data Recordは24時間保管される。このとき、AWSの複数のAvailabilyty Zone(データセンターのロケーションの概念。以下AZ)にデータが保管され、高耐久性を確保している。
- ユーザーはShardに保管されたData RecordをGETすることができる。Shardからは、2MB/sec, 5TPSの性能でデータを取得できる。
Amazon Kinesisは、巨大なメッセージキューと考えることもできますが、一般的なメッセージキューと比べると、
- Data Recordあたり、50KBのサイズまでのデータを格納できる。
- 取り出したデータは削除されない。再読み込みが可能。
- 容量制限がない(ただし、24時間制約+PUTのスループットの性能により実質的な上限は存在する)。
- AWSのS3やDynamoDBなどと組み合わせ利用されることが念頭に置かれた設計になっている。
などが特に異なる点です。また、Amazon Kinesis自体はデータを加工する、条件分岐をする、といった機能を持っていません。Amazon Kinesis単体で、AWSの他のサービスにデータを連係することもできないため、これらの機能は、ユーザーが自らマネージするインスタンス上に用意する必要があります。Amazon Kinesisをアプリケーションで利用する際のライブラリとして、公式にSDKが提供されています。SDKの話題は、本記事の後半で取り扱います。
データのシャーディング
PUTするData Recordに、「Partition Key」を設定します。Shardは、この「Partition Key」を元にして、Data Recordの行き先をShardに分配します。Partition Keyに偏りが生じるとAmazon Kinesisのスケーラビリティを十分に利用できないので注意が必要です。
シーケンス番号
各Data Recordは、Amazon KinesisのStream中でユニークとなるIDのような番号を持っています。これがシーケンス番号です。シーケンス番号は以下の特徴を持っています。
- Stream内でユニーク値である。
- Data Recordに割り振られたシーケンス番号が変わることはない。
- シーケンス番号は、データの取得に利用される。同一のシーケンス番号からは同一のData Recordが得られる
- 時系列で後からPUTされたData Recordのほうが、大きなシーケンス番号を持っていることが保証される。ただしこの値はインクリメントではない。
データの入出力とSDKの利用
Amazon Kinesisのアーキテクチャを理解したところで、データの入出力についてみていきます。データの入出力は、Amazon KinesisのAPIを利用することになりますが、ここで便利なのが公式から提供されるSDKです。
AWSは、アプリケーション開発に便利なSDKを各言語向けに提供しています。以下がその一例です。
AWS SDK for Python(Boto)でのAmazon Kinesisの利用
では、BotoからAmazon Kinsisに対してデータを入力する例を簡単に紹介します。
from boto import kinesis STREAM_NAME = 'sample-stream' connect = kinesis.connect_to_region('us-east-1') # Streamを作成します。Streamの名前と、Shardの数が指定できます。 connect.create_stream(STREAM_NAME, 1) # 作成したStreamを指定して、Data RecordをPUTします。このときPartition Keyを指定します。 connect.put_record(STREAM_NAME, 'data', 'partition_key')
Amazon Kinesisのユースケースはさまざまですが、いずれの場合も、botoの例で確認したように、Stream、Data Record、Partition Keyの3つを利用してデータをPUTするという点は共通です。
さらに便利な Amazon Kinesis Client Libraryの利用
APIの直接利用はもちろん、botoを利用してもAmazon Kinesisを操作できることが確認できました。しかし、Amazon Kinesisを利用する上で留意する点がいくつかあります。
- Shardはオンラインで増減(スケールアウト)することができるため、取得アプリケーション側で考慮が必要となる
- ストリーミングデータのどこまでの処理が完了したか、シーケンス番号を記録しておく必要がある
こういった、アプリケーション開発者が考慮しなくてはいけない仕事のいくつかを担ってくれるのが、Amazon Kinesis Client Library(KCL)です。
KCLの概要
KCLの主な役割ついて確認します。
- シーケンス番号を設定した間隔でDynamoDBに格納する(Check Pointの記録)。
- KCLを利用した複数アプリケーションを立ち上げると、アプリケーションが利用するShardを自動で選択し、分散する。
- 複数実行されていたアプリケーションのいずれかが障害などで処理を継続できない状態を検知し、残りのアプリケーションが処理を継続する。
- Shardを増加させた場合に、Check Pointの記録を自動で追加する。
耐障害性の考慮やCheck Pointの記録などをKCLに委ねることができ、アプリケーション開発者はビジネスロジックに集中することができます。
KCL for Python
KCLは、長らくJava版のみが提供されてきましたが、2014年10月21日、Amazon Kinesis Client Library for Pythonがリリースされました。
KCL for Pythonのリリースは、PyData Tokyoにとっては格好の話題となった訳ですが、今回の講演で、実はKCL for PythonはPureなPythonライブラリではなく、KCL for Javaの「MultiLangDaemon」という仕組みを利用したライブラリであることが明らかになりました。KCLの他言語対応を見据える上で、コアライブラリはあくまでJava版とし、各言語向けのインターフェースを提供していくという方向性が窺い知れます。
ともかく、PythonのコードでAmazon Kinesisが便利に利用できることになったのは間違いありません。なお、2014年12月14日時点で、Python 3系には対応していません。Python 2.7系で利用しましょう。
# amazon_kclpyのインストール pip install amazon_kclpy
インストール後は、サンプルのとおりに実行すると、DynamoDBにPythonKCLSampleというテーブルが作成されます(図1)。
テーブルの中身は、各シャードごとのCheckpointの情報です(図2)。今回のShardは1つのため、1Shard分の情報が記録されています。
このサンプルの設定は同梱のsample.propertiesに記載されており、リージョンやテーブル名などの指定はsample.propertiesで明示することで行えます。
Kinesisの使いどころ
Kinesisは、これまでのAWSのサービスと比較すると「使いどころが難しい」サービスであるという話を耳にします。その理由として、既存のログ・コレクターツールとの使い分けのイメージが付かない点にあるのではないかと考えられます。ログ・コレクターツールの具体例としては、fluentdが該当します。
特定のデータベースに決められたフォーマットで登録するのみ、といったような単一の用途の場合、Kinesisを利用すると構成が複雑になってしまうことがあります。
また次のようなケースでは、Kinesisの導入は適切ではないと言えます。
1レコードごとのデータの重要性が非常に高い場合
現在、Kinesisのデータの保存期間は、24時間です。従ってシステム障害などで、データを取得できない時間が24時間を経過してしまうとデータをロストしてしまいます。このリスクをどう判断するかということが、重要な基準の一つになり得ます。
1レコードのサイズが大きい場合
現在、1レコードあたり50KBまでという制約が存在します。50KB以内であれば、テキスト以外のデータを入出力することはできますが、その場合このサイズ制限に注意する必要があります。
いずれもKinesisのサービス仕様上の制約ですが、機能向上で解決する可能性はありますので今後に期待したいところです。
Kinesisは「図3. Amazon Kinesisの構成例」で示すように、同一のデータソースを、複数のサービスやニーズに応じて使い分ける場合に有効です。今回の講演の中で榎並さんが強調されていた点でもあります。
KCL for Pythonを足がかりに、ストリーミング処理サービスの開発がPythonでも活発に行われるようになることを楽しみにしています。