ØMQ
イベント駆動型のプログラムが1つのプロセス内で完結する場合はEventEmitterで事足りますが、異なるプロセス間、あるいは異なるサーバ間でメッセージをやりとりする場合にはEventEmitterを使うことができません。そのような場合、メッセージをやりとりする他の手段の一つとしてØMQを使うことができます。
「ゼロエムキュー」と読む。ZeroMQ、OMQ、zmqなどとも書く。
ØMQを使うと、IPC(プロセス間通信)やTCPなどのプロトコルを透過的に使いながら同じAPIでメッセージをやりとりできます。通信の両端となるソケットどうしをつなぐ手段としては、プロセス内通信、UNIXドメインソケットによるIPC、TCP、PGMプロトコルによるマルチキャスト、の4種類が使えます。これらの違いにかかわらずØMQのAPIは共通となっているので、ソースコードのアドレス指定部分を変えるだけで内部で使用するプロトコルを変更でき、システム全体を柔軟に構成することができます。ØMQはメッセージの仲介サーバが不要で、接続する2つのソケットどうしが協調動作します。何らかのアクシデントによりソケット間の接続が切れた場合には自動的に再接続が行われます。
メッセージのパターンとしては、リクエストとレスポンスの他にPub/Subなどのパターンを使用できます。あらゆる言語のバインディングが存在するため、他の言語で作られたプログラムと連携させることも容易です。
配信側(Publisher)は特定の宛先を指定せずにメッセージを送信し、購読側(Subscriber)は関心のあるメッセージだけを受信するというメッセージパターン。
ØMQのインストール(UNIX)
LinuxやUNIX、Macの場合は、Stable release(安定版)のPOSIX tarballをダウンロードします。
$ tar zxvf zeromq-2.2.0.tar.gz $ cd zeromq-2.2.0 $ ./configure $ make $ make install (必要に応じて管理者権限で実行)
上記のコマンドは、実際にダウンロードしたファイル名に応じて適宜読み替えてください。
モジュールのインストール
Node.jsからØMQを使うためにzeromq.node(MITライセンス)というモジュールをインストールします。インストールするにはプロジェクトのディレクトリで次のコマンドを実行します。
$ npm install zmq
リクエスト・レスポンス型(DEALER/ROUTER)
リクエストとレスポンスというメッセージパターンの場合は、クライアント側にDEALER、サーバ側にROUTERというソケットタイプを使います(リスト6、7)。
zmq = require 'zmq' req = zmq.socket 'dealer' # DEALERソケットを作成 # 127.0.0.1の9000番ポートにTCPで接続する req.connect 'tcp://127.0.0.1:9000' # サーバからメッセージを受信した時に呼ばれるコールバック関数を登録 req.on 'message', (data) -> console.log "クライアント受信: #{data}" # サーバへメッセージを送信 req.send "Hello, World!"
zmq = require 'zmq' rep = zmq.socket 'router' # ROUTERソケットを作成 # 127.0.0.1の9000番ポートで待機 rep.bind 'tcp://127.0.0.1:9000', -> # クライアントからメッセージを受信した時に実行されるコールバック関数を登録 rep.on 'message', (client..., data) -> console.log "サーバ受信: #{data}" # クライアントにメッセージを送信 rep.send [client..., "受け取りました"]
それぞれを実行した結果は、リスト8_1と8_2です。ØMQには自動再接続の機能があるため、サーバよりクライアントを先に起動しても問題ありません。
サーバ受信: Hello, World!
クライアント受信: 受け取りました
これを利用してRPCのような関数実行機能を実装するとリスト9、10のようになります。
zmq = require 'zmq' # メソッドの定義 methods = # 引数の数値を2倍にして返す 'double': (args, callback) -> result = (num * 2 for num in args) callback result # 引数の数値を半分にして返す 'half': (args, callback) -> result = (num / 2 for num in args) callback result # 現在時刻を表す文字列を返す 'time': (args, callback) -> callback new Date() + '' # メソッドを呼び出す callMethod = (method, args, callback) -> if methods[method]? # メソッドが存在する場合 methods[method] args, (result) -> callback status:'ok', result:result else # メソッドが存在しない場合 callback status:'error', error:'NoSuchMethod' # サーバ rep = zmq.socket 'router' # TCPの9000番ポートで接続を待ち受ける rep.bind 'tcp://0.0.0.0:9000', (err) -> throw err if err # クライアントからメッセージが来た時に呼ばれる rep.on 'message', (client..., data) -> # 文字列をJSONに変換 json = JSON.parse data # メソッドを呼び出す callMethod json.method, json.args, (resp) -> # リクエストと同じcbidを付けてクライアントに返す resp.cbid = json.cbid # 文字列に変換して送信 rep.send [client..., JSON.stringify resp]
zmq = require 'zmq' # コールバック関数を保持しておくオブジェクト callbacks = {} # コールバック関数の連番 callbackId = 0 # サーバのメソッドを呼び出す callRemote = (method, args, callback) -> cbid = callbackId++ # あとで呼び出すためにコールバック関数を保持 callbacks[cbid] = callback req.send JSON.stringify method: method args: args cbid: cbid # クライアント req = zmq.socket 'dealer' # 接続先を指定 req.connect 'tcp://127.0.0.1:9000' # サーバからメッセージが来た時に呼ばれる req.on 'message', (resp) -> json = JSON.parse resp # 該当するコールバック関数があれば実行する if callbacks[json.cbid]? callbacks[json.cbid] json.result delete callbacks[json.cbid] # サーバのメソッドを呼び出す callRemote 'double', [10, 20, 30], (nums) -> # サーバからレスポンスが来たら実行される console.log "double result: #{nums.join ', '}" callRemote 'half', [10, 20, 30], (nums) -> console.log "half result: #{nums.join ', '}" callRemote 'time', [], (time) -> console.log "server time: #{time}"
サーバとクライアントを起動すると、クライアントにリスト11のような実行結果が出ます。
double result: 20, 40, 60 half result: 5, 10, 15 server time: Thu Mar 08 2012 11:27:30 GMT+0900 (JST)
メッセージとしては文字列またはBufferオブジェクトしか送信できないので、オブジェクトや配列はJSON.stringify()で文字列に変換してから送信します。また、実際にメッセージの受け取り側のプログラムを実装するときは、JSONでない文字列が来た場合や、想定しているプロパティがメッセージ中に存在しない場合を想定した例外処理を入れておくようにしましょう。
ROUTERソケットは複数のクライアントからの接続を同時に受け付けることができます。一方、DEALERソケットでconnect()を接続先ごとに1回ずつ呼ぶと、メッセージを送信するたびにラウンドロビン(複数の候補の中から1つを選ぶときに均等に順番に選ぶ方式。持ち回りで選ぶこと)で接続先が選ばれるようになります(リスト12)。
req.connect 'tcp://127.0.0.1:9000' req.connect 'tcp://127.0.0.1:9001'
ØMQソケットを閉じるにはclose()メソッドを呼びます。
req.close()