実装
それでは、実際にコードを書いていきましょう。コードが重複している部分が多々ありますが、説明のためにそのままにしています。また、本質でない部分のコードは省略しています。
ブログ記事投稿
ブログ記事投稿のコードは、以下のようになります。
// ColumnFamily名 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("d"); // Table名 private static final String TABLE = "blog"; // HTablePoolの初期化処理は省略 private final HTablePool hTablePool; // ブログ記事投稿 public void postArticle(long userId, String title, String content, int categoryId) throws IOException { long postAt = System.currentTimeMillis(); // 投稿日時 long updateAt = postAt; // 更新日時 String userName = getUserName(userId); // ユーザ名の取得 String cagegoryName = getCategoryName(categoryId); // カテゴリ名の取得 long articleId = createArticleId(); // 記事IDの生成 // データのシリアライズ byte[] serializedData = serialize(articleId, userId, userName, title, content, categoryId, cagegoryName, postAt, updateAt); // Putオブジェクトの作成(最新順) byte[] row = createRow(userId, postAt, articleId); Put put = new Put(row, postAt); // Timestampにデータを追加するときの時間を指定 put.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData); // セカンダリインデックス(カテゴリ別)のPutオブジェクトの作成 byte[] secondaryIndexRow = createSecondaryIndexRow(userId, categoryId, postAt, articleId); Put secondaryIndexPut = new Put(secondaryIndexRow, postAt); // Timestampにデータを追加するときの時間を指定 secondaryIndexPut.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData); // バッチ処理のためにリストの格納 List<Row> puts = new ArrayList<Row>(); puts.add(put); puts.add(secondaryIndexPut); // プールからHTableInterfaceを取得 HTableInterface table = hTablePool.getTable(TABLE); try { // バッチ処理でPut table.batch(puts); } catch (InterruptedException e) { // 省略 } finally { table.close(); } } // カテゴリ名の取得 private String getCategoryName(int categoryId) { // 省略 } // ユーザ名の取得 private String getUserName(long userId) { // 省略 } // 各データをbyte[]にシリアライズする public byte [] serialize(long articleId, long userId, String userName, String title, String content, int categoryId, String categoryName, long postAt, long updateAt) { // 省略 } // 記事IDの生成 public long createArticleId() { // 省略 } // RowKeyの作成(最新順)。hash(userId)-userId-0-(Long.MAX_VALUE - postAt)-articleId private byte[] createRow(long userId, long postAt, long articleId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 8 + 8); // int型 + long型 + byte型 + long型 + long型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) 0); // 0 buffer.putLong(Long.MAX_VALUE - postAt); // Long.MAX_VALUE - postAt buffer.putLong(articleId); // articleId return buffer.array(); } // セカンダリインデックス(カテゴリ別)のRowKeyの作成。hash(userId)-userId-1-categoryId-(Long.MAX_VALUE - postAt)-articleId private byte[] createSecondaryIndexRow(long userId, int categoryId, long postAt, long articleId) { ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 4 + 8 + 8); // int型 + long型 + byte型 + int型 + long型 + long型 buffer.putInt(hash(userId)); // hash(userId) buffer.putLong(userId); // userId buffer.put((byte) 1); // 1 buffer.putInt(categoryId); // categoryId buffer.putLong(Long.MAX_VALUE - postAt); // Long.MAX_VALUE - postAt buffer.putLong(articleId); // articleId return buffer.array(); } // ハッシュ関数 private int hash(long value) { return (int)(value ^ (value >>> 32)); }
上記コードでは、最新順のためのRowのPutオブジェクトとセカンダリインデックスのRowのPutオブジェクトを作成し、最後にbatchによって実行しています。また、Putオブジェクトを作成する際に、Timestampに投稿日時を指定しています。
こうすることで、2つのRowを同時にPutしたことになり、データ不整合が発生しにくくなります。
createRowやcreateSecondaryIndexRowは、スキーマ設計で決めたRowKeyを作成しています。
ブログ記事更新
ブログ記事更新のコードは以下のようになります。
// ブログ記事更新 public void updateArticle(Article article, String newTitle, String newContent) throws IOException { long updateAt = System.currentTimeMillis(); // 更新日時 // 更新後のデータをシリアライズ byte[] serializedData = serialize(article.getArticleId(), article.getUserId(), article.getUserName(), newTitle, newContent, article.getCategoryId(), article.getCategoryName(), article.getPostAt(), updateAt); // Putオブジェクトの作成(最新順) byte[] row = createRow(article.getUserId(), article.getPostAt(), article.getArticleId()); Put put = new Put(row, updateAt); // Timestampにデータを更新するときの時間を指定 put.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData); // セカンダリインデックス(カテゴリ別)のPutオブジェクトの作成 byte[] secondaryIndexRow = createSecondaryIndexRow(article.getUserId(), article.getCategoryId(), article.getPostAt(), article.getArticleId()); Put secondaryIndexPut = new Put(secondaryIndexRow, updateAt); // Timestampにデータを更新するときの時間を指定 secondaryIndexPut.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData); // バッチ処理のためにリストの格納 List<Row> puts = new ArrayList<Row>(); puts.add(put); puts.add(secondaryIndexPut); // プールからHTableInterfaceを取得 HTableInterface table = hTablePool.getTable(TABLE); try { // バッチ処理でPut table.batch(puts); } catch (InterruptedException e) { // 省略 } finally { table.close(); } }
ブログ記事更新のコードは、ブログ記事投稿のコードとあまり変わりません。更新後のPutオブジェクトを作成し、最後にbatchによって実行しています。
ブログ記事削除
ブログ記事削除のコードは、以下のようになります。
// ブログ記事削除 public void deleteArticle(Article article) throws IOException { long deleteAt = System.currentTimeMillis(); // 削除日時 // Deleteオブジェクトの作成 byte[] row = createRow(article.getUserId(), article.getPostAt(), article.getArticleId()); Delete delete = new Delete(row, deleteAt); // Timestampにデータを削除するときの時間を指定 // セカンダリインデックスのDeleteオブジェクトの作成 byte[] secondaryIndexRow = createSecondaryIndexRow(article.getUserId(), article.getCategoryId(), article.getPostAt(), article.getArticleId()); Delete secondaryIndexDelete = new Delete(secondaryIndexRow, deleteAt); // Timestampにデータを削除するときの時間を指定 // バッチ処理のためにリストの格納 List<Row> deletes = new ArrayList<Row>(); deletes.add(delete); deletes.add(secondaryIndexDelete); // プールからHTableInterfaceを取得 HTableInterface table = hTablePool.getTable(TABLE); try { // バッチ処理でDelete table.batch(deletes); } catch (InterruptedException e) { // 省略 } finally { table.close(); } }
こちらでも、特に難しいことはしておらず、2つのRowに対してDeleteオブジェクトを作成し、batchで実行しています。