SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

初めてのHBase

HBaseを使って簡易ブログサービスを作ってみよう

初めてのHBase 第3回


  • X ポスト
  • このエントリーをはてなブックマークに追加

実装

 それでは、実際にコードを書いていきましょう。コードが重複している部分が多々ありますが、説明のためにそのままにしています。また、本質でない部分のコードは省略しています。

ブログ記事投稿

 ブログ記事投稿のコードは、以下のようになります。

// ColumnFamily名
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("d");

// Table名
private static final String TABLE = "blog";

// HTablePoolの初期化処理は省略
private final HTablePool hTablePool;

// ブログ記事投稿
public void postArticle(long userId, String title, String content, int categoryId) throws IOException {
  long postAt = System.currentTimeMillis(); // 投稿日時
  long updateAt = postAt; // 更新日時
  String userName = getUserName(userId); // ユーザ名の取得
  String cagegoryName = getCategoryName(categoryId); // カテゴリ名の取得
  long articleId = createArticleId(); // 記事IDの生成

  // データのシリアライズ
  byte[] serializedData = serialize(articleId, userId, userName, title, content, categoryId, cagegoryName, postAt, updateAt);

  // Putオブジェクトの作成(最新順)
  byte[] row = createRow(userId, postAt, articleId);
  Put put = new Put(row, postAt); // Timestampにデータを追加するときの時間を指定
  put.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData);
  
  // セカンダリインデックス(カテゴリ別)のPutオブジェクトの作成
  byte[] secondaryIndexRow = createSecondaryIndexRow(userId, categoryId, postAt, articleId);
  Put secondaryIndexPut = new Put(secondaryIndexRow, postAt); // Timestampにデータを追加するときの時間を指定
  secondaryIndexPut.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData);

  // バッチ処理のためにリストの格納
  List<Row> puts = new ArrayList<Row>();
  puts.add(put);
  puts.add(secondaryIndexPut);

  // プールからHTableInterfaceを取得
  HTableInterface table = hTablePool.getTable(TABLE);
  try {
    // バッチ処理でPut
    table.batch(puts);
  } catch (InterruptedException e) {
    // 省略
  } finally {
    table.close();
  }
}

// カテゴリ名の取得
private String getCategoryName(int categoryId) {
  // 省略
}

// ユーザ名の取得
private String getUserName(long userId) {
  // 省略
}

// 各データをbyte[]にシリアライズする
public byte [] serialize(long articleId, long userId, String userName, String title, String content, int categoryId, String categoryName, long postAt, long updateAt) {
  // 省略
}

// 記事IDの生成
public long createArticleId() {
  // 省略
}

// RowKeyの作成(最新順)。hash(userId)-userId-0-(Long.MAX_VALUE - postAt)-articleId
private byte[] createRow(long userId, long postAt, long articleId) {
  ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 8 + 8); // int型 + long型 + byte型 + long型 + long型
  buffer.putInt(hash(userId)); // hash(userId)
  buffer.putLong(userId); // userId
  buffer.put((byte) 0); // 0
  buffer.putLong(Long.MAX_VALUE - postAt); // Long.MAX_VALUE - postAt
  buffer.putLong(articleId); // articleId
  return buffer.array();
}

// セカンダリインデックス(カテゴリ別)のRowKeyの作成。hash(userId)-userId-1-categoryId-(Long.MAX_VALUE - postAt)-articleId
private byte[] createSecondaryIndexRow(long userId, int categoryId, long postAt, long articleId) {
  ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + 1 + 4 + 8 + 8); // int型 + long型 + byte型 + int型 + long型 + long型
  buffer.putInt(hash(userId)); // hash(userId)
  buffer.putLong(userId); // userId
  buffer.put((byte) 1); // 1
  buffer.putInt(categoryId); // categoryId
  buffer.putLong(Long.MAX_VALUE - postAt); // Long.MAX_VALUE - postAt
  buffer.putLong(articleId); // articleId
  return buffer.array();
}

// ハッシュ関数
private int hash(long value) {
  return (int)(value ^ (value >>> 32));
}

 上記コードでは、最新順のためのRowのPutオブジェクトとセカンダリインデックスのRowのPutオブジェクトを作成し、最後にbatchによって実行しています。また、Putオブジェクトを作成する際に、Timestampに投稿日時を指定しています。

 こうすることで、2つのRowを同時にPutしたことになり、データ不整合が発生しにくくなります。

 createRowやcreateSecondaryIndexRowは、スキーマ設計で決めたRowKeyを作成しています。

ブログ記事更新

 ブログ記事更新のコードは以下のようになります。

// ブログ記事更新
public void updateArticle(Article article, String newTitle, String newContent) throws IOException {
  long updateAt = System.currentTimeMillis(); // 更新日時
  
  // 更新後のデータをシリアライズ
  byte[] serializedData = serialize(article.getArticleId(), article.getUserId(), 
      article.getUserName(), newTitle, newContent, article.getCategoryId(), article.getCategoryName(), article.getPostAt(), updateAt);
  
  // Putオブジェクトの作成(最新順)
  byte[] row = createRow(article.getUserId(), article.getPostAt(), article.getArticleId());
  Put put = new Put(row, updateAt); // Timestampにデータを更新するときの時間を指定
  put.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData);
  
  // セカンダリインデックス(カテゴリ別)のPutオブジェクトの作成
  byte[] secondaryIndexRow = createSecondaryIndexRow(article.getUserId(), article.getCategoryId(), article.getPostAt(), article.getArticleId());
  Put secondaryIndexPut = new Put(secondaryIndexRow, updateAt); // Timestampにデータを更新するときの時間を指定
  secondaryIndexPut.add(COLUMN_FAMILY, HConstants.EMPTY_BYTE_ARRAY, serializedData);

  // バッチ処理のためにリストの格納
  List<Row> puts = new ArrayList<Row>();
  puts.add(put);
  puts.add(secondaryIndexPut);

  // プールからHTableInterfaceを取得
  HTableInterface table = hTablePool.getTable(TABLE);
  try {
    // バッチ処理でPut
    table.batch(puts);
  } catch (InterruptedException e) {
    // 省略
  } finally {
    table.close();
  }
}

 ブログ記事更新のコードは、ブログ記事投稿のコードとあまり変わりません。更新後のPutオブジェクトを作成し、最後にbatchによって実行しています。

ブログ記事削除

 ブログ記事削除のコードは、以下のようになります。

// ブログ記事削除
public void deleteArticle(Article article) throws IOException {
  long deleteAt = System.currentTimeMillis(); // 削除日時
  
  // Deleteオブジェクトの作成
  byte[] row = createRow(article.getUserId(), article.getPostAt(), article.getArticleId());
  Delete delete = new Delete(row, deleteAt); // Timestampにデータを削除するときの時間を指定
  
  // セカンダリインデックスのDeleteオブジェクトの作成
  byte[] secondaryIndexRow = createSecondaryIndexRow(article.getUserId(), article.getCategoryId(), article.getPostAt(), article.getArticleId());
  Delete secondaryIndexDelete = new Delete(secondaryIndexRow, deleteAt); // Timestampにデータを削除するときの時間を指定
  
  // バッチ処理のためにリストの格納
  List<Row> deletes = new ArrayList<Row>();
  deletes.add(delete);
  deletes.add(secondaryIndexDelete);

  // プールからHTableInterfaceを取得
  HTableInterface table = hTablePool.getTable(TABLE);
  try {
    // バッチ処理でDelete
    table.batch(deletes);
  } catch (InterruptedException e) {
    // 省略
  } finally {
    table.close();
  }
}

 こちらでも、特に難しいことはしておらず、2つのRowに対してDeleteオブジェクトを作成し、batchで実行しています。

次のページ
まとめ

修正履歴

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
初めてのHBase連載記事一覧

もっと読む

この記事の著者

鈴木 俊裕(スズキ トシヒロ)

株式会社サイバーエージェント アメーバ事業本部 Ameba Technology Laboratory 2008年4月に株式会社サイバーエージェントに新卒で入社。基盤システムの開発・運用に従事する。 2010年4月にHadoop/Hiveを用いたログ解析基盤の開発・運用を担当する。 2011年4月に、ログ解析、レコメンド、検索エンジンなどを開発するAmeba Technology Laboratoryの立ち上げメンバーとなる。 2011年10月からHBaseを用...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/7128 2013/09/24 19:02

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング