はじめに
前回に引き続き、HBaseをSQLで扱えるライブラリPhoenixを扱っていきたいと思います。今回はPhoenixの高度な機能である、トランザクションとセカンダリインデックスを使ってみたいと思います。
ところで先日、PhoenixはApacheプロジェクトのインキュベータになったようです。PhoenixのWebページも以下に移動したようなので、こちらを参考にしていただければと思います。
対象読者
- HBaseを使ってみたいけど、どう使ったらよいか分からない方
- MySQLなどのRDB以外のデータベースを使ってみたい方
コプロセッサについて
Phoenixの高度な機能の説明に入る前に、これまでの連載では触れてきませんでしたが、HBaseの機能の一つである「コプロセッサ」について説明します。
コプロセッサは、任意のコードを直接HRegionServer上で実行できるフレームワークです。Phoenixでは、一部の機能をこのコプロセッサを使用して実現しています。
コプロセッサを用いると、クライアントサイドで行っていた処理をサーバサイドで行うことが可能となり、処理によってはより効率的にすることができます。また、コプロセッサには、オブザーバとエンドポイントの2種類あります。
今回はコプロセッサの実装方法については触れませんが、それぞれの概要について簡単に説明します。
オブザーバ
オブザーバは、特定のイベントが発生した際に、処理を追加できる機能です。RDBMSでいうトリガに近い機能になります。
以下、フックできるイベントの例になります。
- Regionの状態変化
- FlushやCompactionやSplitの発生や終了
- Get、Put、Delete、CheckAndPutなどのクライアントAPIの処理
- WAL(Write Ahead Log)の書き込みやファイルローテションなど
- Tableの作成や削除など
- Regionのmoveやバランシングなど
エンドポイント
エンドポイントは、呼び出し可能なリモートプロシージャを追加できる機能です。RDBMSでいうと、ストアドプロシージャに近い機能です。
エンドポイントを用いると、例えば、各HRegionServer上でローカルなデータに対して演算処理をさせて、その結果をクライアントサイドで集計するという処理を行うことができます。
トランザクション
Phoenixのトランザクションについて説明します。これまでの連載の中でも説明してきましたが、HBaseには基本的にはRow内に限定されたトランザクションしかありません。Phoenixでも、現在のところ、HBaseを超えたトランザクションは提供されていません。
Phoenixのトランザクションの実装を簡単に説明します。
まず、UPSERT文やDELETE文などの変更を即座にHBaseに送ることなくクライアント側でバッファリングします。
そして、コミットする場合はそれらをHBaseに送り、ロールバックする場合は変更を捨てるというロジックです。
分離レベルとしてはREAD COMMITTEDをサポートしています。ただし、同じトランザクション内でのコミットされていない変更に関しては反映されません。当然、READ COMMITTEDなので、同じトランザクション中でも同じデータを読み込むたびに値が変わってしまう現象が発生します(ファントムリード、ノンリピータブルリード)。
以下は、Javaのコードでのトランザクションの例です。
Connection connection = null; Statement statement = null; ResultSet resultSet1 = null; ResultSet resultSet2 = null; try { connection = DriverManager.getConnection("jdbc:phoenix:localhost"); connection.setAutoCommit(false); statement = connection.createStatement(); // テーブル作成 statement.executeUpdate( "CREATE TABLE TBL (" + "COL1 VARCHAR NOT NULL PRIMARY KEY," + "COL2 INTEGER" + ")"); // 更新 statement.executeUpdate("UPSERT INTO TBL VALUES ('aaa', 1)"); statement.executeUpdate("UPSERT INTO TBL VALUES ('bbb', 2)"); statement.executeUpdate("UPSERT INTO TBL VALUES ('ccc', 3)"); // 参照 resultSet1 = statement.executeQuery("SELECT * FROM TBL"); while (resultSet1.next()) { // 上のUPSERT文は反映されていない System.out.println(resultSet1.getString(1) + "," + resultSet1.getInt(2));; } // 参照(2回目) resultSet2 = statement.executeQuery("SELECT * FROM TBL"); while (resultSet2.next()) { // 1回目の参照と同じ結果が返ってくるとは限らない System.out.println(resultSet2.getString(1) + "," + resultSet2.getInt(2));; } // コミット connection.commit(); } catch (Exception e) { // ロールバック connection.rollback(); } finally { if (resultSet1 != null) resultSet1.close(); if (resultSet2 != null) resultSet2.close(); if (statement != null) statement.close(); if (connection != null) connection.close(); }