どうやって使うの?
では実際にLazy Queuesを設定したキューを定義してやり、キューに対してメッセージを送る処理を記述してみます。
まずはLazy Queuesを使わない、通常のキューを用いて以下の単純なPub-Subモデルのシステムを記述します。実装に当たり、Ruby(v2.2.2)とRuby版RabbitMQライブラリBunnyを用います。
通常のキュー
まずはじめに、メッセージの送信処理を記述します。以下のコードはRabbitMQサーバ「mq-server」に対して、1KBのメッセージ送る処理になります。
require 'bunny' c = Bunny.new(:host => 'mq-server', :user => 'tsuchinoko', :password => 'passwd') c.start ch = c.create_channel q = ch.queue('basic-queue') begin loop do q.publish('a' * 1024) end rescue Exception => _ ch.close c.close end
コードの中身を簡単に解説すると、3行目で設定したRabbitMQサーバの情報に基づいて、4行目でコネクションを確立させます。次に6行目でChannelと呼ばれるオブジェクトを生成します。ChannelはAMQPで規定されているメッセージブローカー内部での論理的接続で、一つのTCPコネクションで複数のChannelを定義させることができます。またキュー、エクスチェンジの設定・状態はChannelごとに独立します。なのでChannelに対する操作はスレッドセーフであり、安全に並列処理が行えます。
そして7行目で「basic-queue」と名前付けしたキューを生成しています。10-12行目で、このキューに対してサイズ1KBのメッセージを送り続けます。
続いて、受信処理を記述します。
require 'bunny' c = Bunny.new(:host => 'mq-server', :user => 'tsuchinoko', :password => 'passwd') c.start ch = c.create_channel q = ch.queue('default-queue') begin q.subscribe(:block => true) do |delivery_info, properties, data| puts data end rescue Exception => _ ch.close c.close end
7行目まではメッセージ送信処理と同じです。メッセージの受信は10行目のsubscribeで行われます。サーバのキュー「basic-queue」にメッセージが存在すると、ブロック内部の処理が呼ばれます。ブロックで受け取っている3つのパラメータはそれぞれ以下のとおりです。
- delivery_info:どういったアプリケーションから、どういった経路で送られたか、といったメッセージ転送処理に関する情報
- properties:データタイプや永続化されているか、といったメッセージ自体のメタデータ情報
- data:キューに格納されたメッセージの本体
Lazy Queuesを設定したキュー
では、次にLazy Queuesを用いた送信処理を記述します。
Lazy Queuesは、キューを宣言する際にパラメータ「x-queue-mode」に「lazy」を指定することでLazy Queuesを設定できます(デフォルトではLazy Queues非設定の「default」が指定されます)。
以下は、Lazy Queuesを設定したキューを用いたメッセージの送信処理になります。
require 'bunny' c = Bunny.new(:host => 'mq-server', :user => 'tsuchinoko', :password => 'passwd') c.start ch = c.create_channel q = ch.queue('lazy-queue', :arguments => {'x-queue-mode' => 'lazy'}) begin loop do q.publish('a' * 1024) end rescue Exception => _ ch.close c.close end
先に示した、通常のキューの実装との差分は以下のとおりです。
--- basic_queues/sender.rb 2016-02-14 15:25:51.000000000 +0900 +++ lazy_queues/sender.rb 2016-02-14 23:35:36.000000000 +0900 @@ -6,7 +6,7 @@ c.start ch = c.create_channel -q = ch.queue('basic-queue') +q = ch.queue('lazy-queue', :arguments => {'x-queue-mode' => 'lazy'}) begin loop do
RabbitMQでは、作成済みのキューに対してパラメータの変更を行うことはできないため、通常のキューとは別の名前「lazy-queue」を設定しています。すでに作成されているキューと同名のキューに対してLazy Queuesを設定する場合、deleteメソッドによって当該キューを削除してから、再定義してやる必要があります。
同様に受信処理についても、Lazy Queuesを用いた場合とそうでない場合の差分を以下に示します。
--- basic_queues/receive.rb 2016-02-14 23:36:04.000000000 +0900 +++ lazy_queues/receive.rb 2016-02-14 23:36:28.000000000 +0900 @@ -6,7 +6,7 @@ c.start ch = c.create_channel -q = ch.queue('default-queue') +q = ch.queue('lazy-queue', :arguments => {'x-queue-mode' => 'lazy'}) begin q.subscribe(:block => true) do |delivery_info, properties, data|
では実際にこれら2つの処理を動かして、RabbitMQサーバの振る舞いの違いを見てみましょう。