実装
それでは、実際にコードを書いていきましょう。前回同様、コードが重複している部分が多々ありますが、説明のためにそのままにしています。また、本質でない部分のコードは省略しています。
今回使用するTableを、HBase shellで以下のように作成しておきます。
$ bin/hbase shell hbase(main):003:0> create "access", {NAME => "h"}, {NAME => "d"}, {NAME => "t"} 0 row(s) in 1.1870 seconds
アクセスのカウント
それでは、まずアクセスをカウントするコードからです。コードは以下のようになります。
// アクセスをカウントする public void count(String domain, String path, int amount) throws IOException { // RowKeyの作成 String reversedDomain = reverseDomain(domain); // reverse domainの作成 byte[] row = createRow(reversedDomain, path); // Incrementオブジェクトの生成 Increment increment = new Increment(row); increment.addColumn(HOURLY_COLUMN_FAMILY, createHourlyQualifier(), amount); // アワリー increment.addColumn(DAILY_COLUMN_FAMILY, createDailyQualifier(), amount); // デイリー increment.addColumn(TOTAL_COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, amount); // トータル // インクリメント HTableInterface table = hTablePool.getTable(TABLE); try { table.increment(increment); } finally { table.close(); } } // リバースドメインを作成する private String reverseDomain(String domain) { String[] split = domain.split("\\.", 2); if (split.length == 1) return domain; return reverseDomain(split[1]) + "." + split[0]; } // RowKeyを作成する。reversedDomainとpathをタブ区切りで連結 private byte[] createRow(String reversedDomain, String path) { return Bytes.toBytes(reversedDomain + "\t" + path); } // 現在時刻から、アワリーのColumn名を作成する private byte[] createHourlyQualifier() { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH"); return Bytes.toBytes(sdf.format(new Date())); } // 現在時刻から、デイリーのColumn名を作成する private byte[] createDailyQualifier() { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); return Bytes.toBytes(sdf.format(new Date())); }
RowKeyを作成し、アワリー、デイリー、トータルに対してインクリメントを行っています。
今回は、アワリー、デイリー、トータルを1つのRowKeyにマッピングしているので、Incrementオブジェクトを生成しインクリメントを行っています。
前回も説明しましたが、Incrementオブジェクトを用いたインクリメントは、読み込みに対してアトミックではありません。一部だけインクリメントされた状態を読み込むことがあるので、注意が必要です。
アワリーのアクセスを取得
次に、アワリーのアクセスを取得するコードです。
// アワリーのアクセスを取得する public List<Access> getHourlyCount(String domain, String path, Calendar startHour, Calendar endHour) throws IOException { // Scanの作成 Scan scan; String reversedDomain = reverseDomain(domain); if (path != null) { byte[] row = createRow(reversedDomain, path); scan = new Scan(row, row); } else { byte[] prefix = Bytes.toBytes(reversedDomain); scan = new Scan(prefix); scan.setFilter(new PrefixFilter(prefix)); } SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH"); // startHourとendHourから取得したいColumnを指定 Calendar cal = (Calendar) startHour.clone(); while (cal.before(endHour)) { scan.addColumn(HOURLY_COLUMN_FAMILY, Bytes.toBytes(sdf.format(cal.getTime()))); cal.add(Calendar.HOUR_OF_DAY, 1); } // 結果の取得 List<Access> ret = new ArrayList<Access>(); HTableInterface table = hTablePool.getTable(TABLE); try { ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { String[] domainAndPath = extractDomainAndPath(result.getRow()); for (Map.Entry<byte[], byte[]> entry: result.getFamilyMap(HOURLY_COLUMN_FAMILY).entrySet()) { byte[] qualifier = entry.getKey(); byte[] value = entry.getValue(); // 時間 Calendar time = Calendar.getInstance(); time.setTime(sdf.parse(Bytes.toString(qualifier))); // Accessオブジェクトの作成 Access access = new Access(); access.setTime(time); access.setDomain(reverseDomain(domainAndPath[0])); access.setPath(domainAndPath[1]); access.setCount(Bytes.toLong(value)); ret.add(access); } } } catch (ParseException e) { // 省略 } finally { table.close(); } return ret; } // RowKeyからドメインとパスを抽出する private String[] extractDomainAndPath(byte[] row) { return Bytes.toString(row).split("\t"); }
Scanオブジェクトを作成する際に、pathがnullでない場合は、startRowとstopRowにリバースドメインとパスを連結したものを指定しています。
ScanオブジェクトのstartRowとstopRowに同じものを指定すると、そのRowKeyのデータのみが取得できます。
pathがnullの場合は、リバースドメインをプレフィックスにしてScanを行います。
前回紹介したパーシャルScanを用いても良いのですが、今回は、PrefixFilterを用いて実現することにしました。この場合も、startRowを設定する必要があります。