SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

CoffeeScriptによるモダンなWebアプリケーション開発

CoffeeScriptベストプラクティス集
Node.jsアプリケーション編(1)

CoffeeScriptによるモダンなWebアプリケーション開発 第3回

  • X ポスト
  • このエントリーをはてなブックマークに追加

ØMQ

 イベント駆動型のプログラムが1つのプロセス内で完結する場合はEventEmitterで事足りますが、異なるプロセス間、あるいは異なるサーバ間でメッセージをやりとりする場合にはEventEmitterを使うことができません。そのような場合、メッセージをやりとりする他の手段の一つとしてØMQを使うことができます。

※ØMQ

 「ゼロエムキュー」と読む。ZeroMQ、OMQ、zmqなどとも書く。

 ØMQを使うと、IPC(プロセス間通信)やTCPなどのプロトコルを透過的に使いながら同じAPIでメッセージをやりとりできます。通信の両端となるソケットどうしをつなぐ手段としては、プロセス内通信、UNIXドメインソケットによるIPC、TCP、PGMプロトコルによるマルチキャスト、の4種類が使えます。これらの違いにかかわらずØMQのAPIは共通となっているので、ソースコードのアドレス指定部分を変えるだけで内部で使用するプロトコルを変更でき、システム全体を柔軟に構成することができます。ØMQはメッセージの仲介サーバが不要で、接続する2つのソケットどうしが協調動作します。何らかのアクシデントによりソケット間の接続が切れた場合には自動的に再接続が行われます。

 メッセージのパターンとしては、リクエストとレスポンスの他にPub/Subなどのパターンを使用できます。あらゆる言語のバインディングが存在するため、他の言語で作られたプログラムと連携させることも容易です。

※Pub/Sub

 配信側(Publisher)は特定の宛先を指定せずにメッセージを送信し、購読側(Subscriber)は関心のあるメッセージだけを受信するというメッセージパターン。

ØMQのインストール(UNIX)

 LinuxやUNIX、Macの場合は、Stable release(安定版)のPOSIX tarballをダウンロードします。

インストールするコマンド
$ tar zxvf zeromq-2.2.0.tar.gz
$ cd zeromq-2.2.0
$ ./configure
$ make
$ make install  (必要に応じて管理者権限で実行)

 上記のコマンドは、実際にダウンロードしたファイル名に応じて適宜読み替えてください。

モジュールのインストール

 Node.jsからØMQを使うためにzeromq.node(MITライセンス)というモジュールをインストールします。インストールするにはプロジェクトのディレクトリで次のコマンドを実行します。

インストールするコマンド
$ npm install zmq

リクエスト・レスポンス型(DEALER/ROUTER)

 リクエストとレスポンスというメッセージパターンの場合は、クライアント側にDEALER、サーバ側にROUTERというソケットタイプを使います(リスト6、7)。

[リスト6]クライアント(DEALER)側
zmq = require 'zmq'
req = zmq.socket 'dealer'  # DEALERソケットを作成

# 127.0.0.1の9000番ポートにTCPで接続する
req.connect 'tcp://127.0.0.1:9000'

# サーバからメッセージを受信した時に呼ばれるコールバック関数を登録
req.on 'message', (data) ->
    console.log "クライアント受信: #{data}"

# サーバへメッセージを送信
req.send "Hello, World!"
[リスト7]サーバ(ROUTER)側
zmq = require 'zmq'
rep = zmq.socket 'router'  # ROUTERソケットを作成

# 127.0.0.1の9000番ポートで待機
rep.bind 'tcp://127.0.0.1:9000', ->

    # クライアントからメッセージを受信した時に実行されるコールバック関数を登録
    rep.on 'message', (client..., data) ->
        console.log "サーバ受信: #{data}"
        
        # クライアントにメッセージを送信
        rep.send [client..., "受け取りました"]

 それぞれを実行した結果は、リスト8_1と8_2です。ØMQには自動再接続の機能があるため、サーバよりクライアントを先に起動しても問題ありません。

[リスト8_1]サーバ側実行結果
サーバ受信: Hello, World!
[リスト8_2]クライアント側実行結果
クライアント受信: 受け取りました

 これを利用してRPCのような関数実行機能を実装するとリスト9、10のようになります。

[リスト9]RPCサーバ
zmq = require 'zmq'

# メソッドの定義
methods =
    # 引数の数値を2倍にして返す
    'double': (args, callback) ->
        result = (num * 2 for num in args)
        callback result

    # 引数の数値を半分にして返す
    'half': (args, callback) ->
        result = (num / 2 for num in args)
        callback result

    # 現在時刻を表す文字列を返す
    'time': (args, callback) ->
        callback new Date() + ''

# メソッドを呼び出す
callMethod = (method, args, callback) ->
    if methods[method]?  # メソッドが存在する場合
        methods[method] args, (result) ->
            callback status:'ok', result:result
    else  # メソッドが存在しない場合
        callback status:'error', error:'NoSuchMethod'

# サーバ
rep = zmq.socket 'router'
# TCPの9000番ポートで接続を待ち受ける
rep.bind 'tcp://0.0.0.0:9000', (err) ->
    throw err if err
    # クライアントからメッセージが来た時に呼ばれる
    rep.on 'message', (client..., data) ->
        # 文字列をJSONに変換
        json = JSON.parse data
        # メソッドを呼び出す
        callMethod json.method, json.args, (resp) ->
            # リクエストと同じcbidを付けてクライアントに返す
            resp.cbid = json.cbid
            # 文字列に変換して送信
            rep.send [client..., JSON.stringify resp]
[リスト10]RPCクライアント
zmq = require 'zmq'

# コールバック関数を保持しておくオブジェクト
callbacks = {}
# コールバック関数の連番
callbackId = 0

# サーバのメソッドを呼び出す
callRemote = (method, args, callback) ->
    cbid = callbackId++
    # あとで呼び出すためにコールバック関数を保持
    callbacks[cbid] = callback
    req.send JSON.stringify
        method: method
        args:   args
        cbid:   cbid

# クライアント
req = zmq.socket 'dealer'
# 接続先を指定
req.connect 'tcp://127.0.0.1:9000'
# サーバからメッセージが来た時に呼ばれる
req.on 'message', (resp) ->
    json = JSON.parse resp
    # 該当するコールバック関数があれば実行する
    if callbacks[json.cbid]?
        callbacks[json.cbid] json.result
        delete callbacks[json.cbid]

# サーバのメソッドを呼び出す
callRemote 'double', [10, 20, 30], (nums) ->
    # サーバからレスポンスが来たら実行される
    console.log "double result: #{nums.join ', '}"

callRemote 'half', [10, 20, 30], (nums) ->
    console.log "half result: #{nums.join ', '}"

callRemote 'time', [], (time) ->
    console.log "server time: #{time}"
    

 サーバとクライアントを起動すると、クライアントにリスト11のような実行結果が出ます。

[リスト11]クライアントの実行結果
double result: 20, 40, 60
half result: 5, 10, 15
server time: Thu Mar 08 2012 11:27:30 GMT+0900 (JST)

 メッセージとしては文字列またはBufferオブジェクトしか送信できないので、オブジェクトや配列はJSON.stringify()で文字列に変換してから送信します。また、実際にメッセージの受け取り側のプログラムを実装するときは、JSONでない文字列が来た場合や、想定しているプロパティがメッセージ中に存在しない場合を想定した例外処理を入れておくようにしましょう。

 ROUTERソケットは複数のクライアントからの接続を同時に受け付けることができます。一方、DEALERソケットでconnect()を接続先ごとに1回ずつ呼ぶと、メッセージを送信するたびにラウンドロビン(複数の候補の中から1つを選ぶときに均等に順番に選ぶ方式。持ち回りで選ぶこと)で接続先が選ばれるようになります(リスト12)。

[リスト12]connect()を複数回呼ぶとsend()の送信先が分散される
req.connect 'tcp://127.0.0.1:9000'
req.connect 'tcp://127.0.0.1:9001'

 ØMQソケットを閉じるにはclose()メソッドを呼びます。

closeメソッド
req.close()

次のページ
AMQP

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
CoffeeScriptによるモダンなWebアプリケーション開発連載記事一覧

もっと読む

この記事の著者

飯塚 直(イイヅカ ナオ)

1984年東京都生まれ。 高校時代に趣味でPerlやJavaを使ってプログラミングを始める。 慶応大学湘南藤沢キャンパス卒業後、共同通信社にてニュースサイトの開発などを担当。 その後、面白法人カヤックにてソーシャルゲームの開発などを手がける。 2012年現在、カヤックを退社し個人として活動し...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/6481 2012/04/19 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング