ブログ記事の取得
次に、ブログ記事の取得のコードになります。以下は、最新順で取得するコードです。
// ブログ記事の取得(最新順) public List<Article> getArticles(long userId, int length, Article lastArticle) throws IOException { // Scanオブジェクト Scan scan = new Scan(); if (lastArticle == null) { scan.setStartRow(createStartRow(userId)); } else { // ページング処理 scan.setStartRow(createPagingStartRow(userId, lastArticle.getPostAt(), lastArticle.getArticleId())); } scan.setStopRow(createStopRow(userId)); List<Article> ret = new ArrayList<Article>(); // プールからHTableInterfaceを取得 HTableInterface table = hTablePool.getTable(TABLE); ResultScanner scanner = null; try { // ResultScannerの取得 scanner = table.getScanner(scan); for (Result result : scanner) { // データを取得 byte[] value = result.getValue(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY); // データのデシリアライズ Article article = deserialize(value); ret.add(article); if (ret.size() >= length) { break; } } } finally { if (scanner != null) { scanner.close(); } table.close(); } return ret; } // startRowの作成。hash(userId)-userId-0 private byte[] createStartRow(long userId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1); // int型 + long型 + byte型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) 0); // 0 return buffer.array(); } // stopRowの作成。hash(userId)-userId-(0 + 1) private byte[] createStopRow(long userId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1); // int型 + long型 + byte型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) (0 + 1)); // 0 + 1.パーシャルScan return buffer.array(); } // ページング用startRowの作成。hash(userId)-userId-0-(Long.MAX_VALUE - lastPostAt)-(lastArticleId + 1) private byte[] createPagingStartRow(long userId, long lastPostAt, long lastArticleId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 8 + 8); // int型 + long型 + byte型 + long型 + long型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) 0); // 0 buffer.putLong(Long.MAX_VALUE - lastPostAt); // Long.MAX_VALUE - lastPostAt buffer.putLong(lastArticleId + 1); // articleId + 1.最後の記事は含めないので+1する return buffer.array(); } // byte[]からArticleにデシリアライズする private Article deserialize(byte[] bytes) { // 省略 }
Scanオブジェクトを生成し、startRowとstopRowを指定してResultScannerを取得し、結果をデシリアライズして返しています。ここでのポイントは、startRowとstopRowをどのように指定するかということです。
ここでパーシャルScanというテクニックを紹介します。Scan時にstartRowとstopRowを指定するとき、本来はそれぞれを正確に指定したいところですが、いつもstopRowが分かるわけではありません。今回の簡易ブログサービスでも、正確なstopRowを指定するためには、ユーザが最初に投稿した記事のpostAtとarticleIdを覚えておかなければなりません。
そこで、stopRowにstartRow+1を指定してあげるのがパーシャルScanです。今回の簡易ブログサービスでは、startRowにhash(userId)-userId-0を指定し、stopRowにhash(userId)-userId-(0+1)を指定しています。このようにすることで、複雑なことをせずにScanを実行できます。
次に、ページングについてです。勘の良い読者であればもうお分かりでしょう。lastArticleを引数として受け取っているので、そのpostAtとarticleIdを用いてhash(userId)-userId-0-(Long.MAX_VALUE-lastPostAt)-(lastArticleId+1)のRowKeyを作成しstartRowに指定します。lastArticleIdに1を足しているのは、最後の記事を含めないためです。
このようにして、前の結果の次の記事からScanを始めることができるので、高速なページングを実現することができます。
以下は、カテゴリ別で取得するコードです。基本的に、最新順で取得するコードと同じです。違いはセカンダリインデックス用のスキーマに対して、Scanを行っていることです。
// ブログ記事の取得(カテゴリ別) public List<Article> getArticles(long userId, int categoryId, int length, Article lastArticle) throws IOException { // Scanオブジェクト Scan scan = new Scan(); if (lastArticle == null) { scan.setStartRow(createSecondaryIndexStartRow(userId, categoryId)); } else { // ページング処理 scan.setStartRow(createSecondaryIndexPagingStartRow(userId, categoryId, lastArticle.getPostAt(), lastArticle.getArticleId())); } scan.setStopRow(createSecondaryIndexStopRow(userId, categoryId)); List<Article> ret = new ArrayList<Article>(); // プールからHTableInterfaceを取得 HTableInterface table = hTablePool.getTable(TABLE); ResultScanner scanner = null; try { scanner = table.getScanner(scan); for (Result result : scanner) { byte[] value = result.getValue(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY); Article article = deserialize(value); ret.add(article); if (ret.size() >= length) { break; } } } finally { if (scanner != null) { scanner.close(); } table.close(); } return ret; } // セカンダリインデックスのstartRowの作成。hash(userId)-userId-1-categoryId private byte[] createSecondaryIndexStartRow(long userId, int categoryId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 4); // int型 + long型 + byte型 + int型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) 1); // 1 buffer.putInt(categoryId); // categoryId return buffer.array(); } // セカンダリインデックスのstopRowの作成。hash(userId)-userId-1-(categoryId + 1) private byte[] createSecondaryIndexStopRow(long userId, int categoryId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 4); // int型 + long型 + byte型 + int型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) 1); // 1 buffer.putInt(categoryId + 1); // categoryId + 1.パーシャルScan return buffer.array(); } // セカンダリインデックスのページング用startRowの作成。hash(userId)-userId-1-categoryId-(Long.MAX_VALUE - lastPostAt)-(lastArticleId + 1) private byte[] createSecondaryIndexPagingStartRow(long userId, int categoryId, long lastPostAt, long lastArticleId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 4 + 8 + 8); // int型 + long型 + byte型 + int型 + long型 + long型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) 1); // 1 buffer.putInt(categoryId); // categoryId buffer.putLong(Long.MAX_VALUE - lastPostAt); // Long.MAX_VALUE - lastPostAt buffer.putLong(lastArticleId + 1); // articleId + 1.最後の記事は含めないので+1する return buffer.array(); }
まとめ
今回は、HBaseを使って簡易ブログサービスを作りました。実際にアプリケーションを作ることで、HBaseのスキーマ設計についてイメージがついたのではないでしょうか。次回は、HBaseを使ったアプリケーションの別のケーススタディを紹介したいと思います。
また、株式会社サイバーエージェントでは、Hadoop/HBaseエンジニアを募集しています。ご興味のある方はこちらからエントリーしていただければと思います。エンジニア>R&Dエンジニア>R&Dエンジニアを選択しエントリーしていただければ幸いです。