配信・購読型(PUB/SUB)
メッセージパターンとして配信(PUB)と購読(SUB)を利用する場合はリスト13、14のように書きます。1つのPUBソケットは複数のSUBソケットからの接続を受け付けることができます。
zmq = require 'zmq' # PUBソケットを作成 pub = zmq.socket 'pub' # UNIXドメインソケット/tmp/pubsockで接続を待ち受ける pub.bind 'ipc:///tmp/pubsock', (err) -> # bindでエラーが発生した場合 throw err if err # 1秒ごとにメッセージを配信 setInterval -> pub.send "TICKER GOOG 606.80" , 1000 # 5秒ごとにメッセージを配信 setInterval -> pub.send "NEWS 速報" , 5000
zmq = require 'zmq' # SUBソケットを作成 sub = zmq.socket 'sub' # 接続先を指定 sub.connect 'ipc:///tmp/pubsock' # 購読対象を指定(先頭文字列) sub.subscribe 'TICKER' sub.subscribe 'NEWS' # 配信を受け取った時に呼ばれる sub.on 'message', (data) -> console.log "received: #{data}"
received: TICKER GOOG 606.80 received: TICKER GOOG 606.80 received: TICKER GOOG 606.80 received: TICKER GOOG 606.80 received: TICKER GOOG 606.80 received: NEWS 速報 received: TICKER GOOG 606.80 received: TICKER GOOG 606.80 received: TICKER GOOG 606.80
ここでは通信手段にIPCを使っています。IPCは内部でUNIXドメインソケットを使用しており、Linux、UNIX、MacなどのOSで使えます。IPCの代わりにTCPを使う場合はconnect()の引数をipc://エンドポイントからtcp://ホスト:ポート番号に変えるだけでよく、他の修正は必要ありません。この切り替えの手軽さがØMQの大きな特徴の一つです。
ØMQでは他にもPUSH/PULL、PAIRなどのソケットタイプがサポートされており、これらを組み合わせて多様な構成が作れるようになっています。ØMQはドキュメントが貧弱なのが難点ですが、使い方を一度把握してしまえば非常に便利なライブラリです。より詳しい情報はWeb上の文献などを参考にしてください。
ソケットの作成を高速に繰り返すと、システムがソケットを作成できる許容量を超え「Too many open files」といったエラーを出してプロセスが終了してしまうことがあります。このような場合は、一度作成したソケットを一定時間使い回すようにしましょう。特にリクエスト1回につきソケットを1つ作成するようなプログラムは、リクエストの数に比例してソケットの作成数が増えるため、大量のリクエストが来た時にシステムの許容量を超えてしまう危険があります。対策としては、作成したソケットを5秒間など一定の時間使い回すようにすればソケットの数を一定の範囲に抑えることができるので、エラーを回避できます。
AMQP
AMQP(Advanced Message Queuing Protocol)はØMQと似たメッセージプロトコルですが、ØMQより速度が遅い代わりに高い信頼性を持っています。ØMQはメッセージを仲介するサーバを使いませんが、AMQPではブローカーと呼ばれる仲介サーバが必要となります。AMQPを実装した代表的なサーバとしてRabbitMQがあります。RabbitMQには、メッセージの永続化や進行状態のモニタリングなどの機能があります。
ØMQが信頼できないというわけではありませんが、ØMQで通信の両端のプロセスが落ちるとメッセージが消失してしまいます。そのような場合にも自前で復帰できる方法を用意していれば問題ありませんが、信頼性が特に重視され、またブローカーを運用する余裕のある場合にはRabbitMQを使うとよいでしょう。
rabbit.jsのインストール
Node.jsからRabbitMQを使うにはrabbit.js(ライセンスはMozilla Public License 1.1)というモジュールを使います。インストールするにはプロジェクトのディレクトリで次のコマンドを実行します。
$ npm install rabbit.js
リクエスト・レスポンス型(REQ/REP)
リクエストとレスポンスというメッセージパターンの場合はREQ(クライアント)とREP(サーバ)ソケットを使います。サーバは複数のクライアントからのリクエストを同時に受け付けることができます(リスト15, 16)。
# localhostのAMQPサーバに接続 context = require('rabbit.js').createContext 'amqp://localhost' # コンテキストの準備完了を待つ context.on 'ready', -> # サーバソケットを作成 rep = context.socket 'REP' # dataイベントで受け取るデータはUTF-8文字列であるとみなす。 # setEncoding()を行わない場合はBufferオブジェクトが来る。 rep.setEncoding 'utf-8' # クライアントからリクエストが来た時に呼ばれる rep.on 'data', (data) -> console.log "received: #{data}" # data文字列を逆にして送信 rep.write data.split('').reverse().join(''), 'utf-8' # reverseキューに接続する rep.connect 'reverse'
# localhostのAMQPサーバに接続 context = require('rabbit.js').createContext 'amqp://localhost' # コンテキストの準備完了を待つ context.on 'ready', -> # クライアントソケットを作成 req = context.socket 'REQ' # dataイベントで受け取るデータはUTF-8文字列であるとみなす。 # setEncoding()を行わない場合はBufferオブジェクトとして来る。 req.setEncoding 'utf-8' # reverseキューに接続する req.connect 'reverse', -> # サーバから結果が来た時に呼ばれる req.on 'data', (data) -> console.log "result: #{data}" # サーバにUTF-8文字列を送信する req.write 'helloworld', 'utf-8'
サーバとクライアントを実行した結果はリスト17_1、17_2です。
received: helloworld
result: dlrowolleh
ØMQとは違ってRabbitMQには自動再接続の機能はありませんが、ブローカーがキューを処理してくれるため、サーバー側のプログラムよりクライアント側のプログラムを先に実行することができます。
配信・購読型(PUB/SUB)
配信(PUB)と購読(SUB)というメッセージパターンの場合はリスト18、19のように書きます。1つのPUBソケットは複数のSUBソケットからの接続を同時に処理できます。
# localhostのAMQPサーバに接続 context = require('rabbit.js').createContext 'amqp://localhost' # コンテキストの準備完了を待つ context.on 'ready', -> # 配信用ソケットを作成 pub = context.socket 'PUB' # eventsエクスチェンジに接続する、エクスチェンジが存在しない場合は自動的に作られる。 pub.connect 'events', -> # 1秒ごとにUTF-8文字列を送信 setInterval -> pub.write 'TICKER GOOG 606.80', 'utf-8' , 1000 # 5秒ごとにUTF-8文字列を送信 setInterval -> pub.write 'NEWS 速報', 'utf-8' , 5000
# localhostのAMQPサーバに接続 context = require('rabbit.js').createContext 'amqp://localhost' # コンテキストの準備完了を待つ context.on 'ready', -> # 購読用ソケットを作成 sub = context.socket 'SUB' # dataイベントで受け取るデータはUTF-8文字列であるとみなす。 # setEncoding()を行わない場合はBufferオブジェクトが来る。 sub.setEncoding 'utf-8' # データが来た時に呼ばれるコールバック関数を登録 sub.on 'data', (data) -> console.log "受信: #{data}" # eventsエクスチェンジに接続する、エクスチェンジが存在しない場合は自動的に作られる。 sub.connect 'events'
配信側と購読側を実行すると、購読側にリスト20のように出力されます。
受信: TICKER GOOG 606.80 受信: TICKER GOOG 606.80 受信: TICKER GOOG 606.80 受信: TICKER GOOG 606.80 受信: TICKER GOOG 606.80 受信: NEWS 速報 受信: TICKER GOOG 606.80 受信: TICKER GOOG 606.80
これらのライブラリをうまく利用して、イベント駆動型のプログラムを見通しよく作れるよう工夫してみてください。次回も引き続き、CoffeeScriptでNode.jsアプリケーションを開発する際によく使われる実用的な開発手法を紹介します。