SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

CoffeeScriptによるモダンなWebアプリケーション開発

CoffeeScriptベストプラクティス集
Node.jsアプリケーション編(1)

CoffeeScriptによるモダンなWebアプリケーション開発 第3回

  • X ポスト
  • このエントリーをはてなブックマークに追加

配信・購読型(PUB/SUB)

 メッセージパターンとして配信(PUB)と購読(SUB)を利用する場合はリスト13、14のように書きます。1つのPUBソケットは複数のSUBソケットからの接続を受け付けることができます。

[リスト13]配信する側(PUB)
zmq = require 'zmq'

# PUBソケットを作成
pub = zmq.socket 'pub'
# UNIXドメインソケット/tmp/pubsockで接続を待ち受ける
pub.bind 'ipc:///tmp/pubsock', (err) ->
    # bindでエラーが発生した場合
    throw err if err

    # 1秒ごとにメッセージを配信
    setInterval ->
        pub.send "TICKER GOOG 606.80"
    , 1000

    # 5秒ごとにメッセージを配信
    setInterval ->
        pub.send "NEWS 速報"
    , 5000

[リスト14]購読する側(SUB)
zmq = require 'zmq'

# SUBソケットを作成
sub = zmq.socket 'sub'
# 接続先を指定
sub.connect 'ipc:///tmp/pubsock'

# 購読対象を指定(先頭文字列)
sub.subscribe 'TICKER'
sub.subscribe 'NEWS'

# 配信を受け取った時に呼ばれる
sub.on 'message', (data) ->
    console.log "received: #{data}"
購読側実行結果
received: TICKER GOOG 606.80
received: TICKER GOOG 606.80
received: TICKER GOOG 606.80
received: TICKER GOOG 606.80
received: TICKER GOOG 606.80
received: NEWS 速報
received: TICKER GOOG 606.80
received: TICKER GOOG 606.80
received: TICKER GOOG 606.80

 ここでは通信手段にIPCを使っています。IPCは内部でUNIXドメインソケットを使用しており、Linux、UNIX、MacなどのOSで使えます。IPCの代わりにTCPを使う場合はconnect()の引数をipc://エンドポイントからtcp://ホスト:ポート番号に変えるだけでよく、他の修正は必要ありません。この切り替えの手軽さがØMQの大きな特徴の一つです。

 ØMQでは他にもPUSH/PULL、PAIRなどのソケットタイプがサポートされており、これらを組み合わせて多様な構成が作れるようになっています。ØMQはドキュメントが貧弱なのが難点ですが、使い方を一度把握してしまえば非常に便利なライブラリです。より詳しい情報はWeb上の文献などを参考にしてください。

【参考】「Too many open files」エラーが出たら

 ソケットの作成を高速に繰り返すと、システムがソケットを作成できる許容量を超え「Too many open files」といったエラーを出してプロセスが終了してしまうことがあります。このような場合は、一度作成したソケットを一定時間使い回すようにしましょう。特にリクエスト1回につきソケットを1つ作成するようなプログラムは、リクエストの数に比例してソケットの作成数が増えるため、大量のリクエストが来た時にシステムの許容量を超えてしまう危険があります。対策としては、作成したソケットを5秒間など一定の時間使い回すようにすればソケットの数を一定の範囲に抑えることができるので、エラーを回避できます。

AMQP

 AMQP(Advanced Message Queuing Protocol)はØMQと似たメッセージプロトコルですが、ØMQより速度が遅い代わりに高い信頼性を持っています。ØMQはメッセージを仲介するサーバを使いませんが、AMQPではブローカーと呼ばれる仲介サーバが必要となります。AMQPを実装した代表的なサーバとしてRabbitMQがあります。RabbitMQには、メッセージの永続化や進行状態のモニタリングなどの機能があります。

 ØMQが信頼できないというわけではありませんが、ØMQで通信の両端のプロセスが落ちるとメッセージが消失してしまいます。そのような場合にも自前で復帰できる方法を用意していれば問題ありませんが、信頼性が特に重視され、またブローカーを運用する余裕のある場合にはRabbitMQを使うとよいでしょう。

rabbit.jsのインストール

 Node.jsからRabbitMQを使うにはrabbit.js(ライセンスはMozilla Public License 1.1)というモジュールを使います。インストールするにはプロジェクトのディレクトリで次のコマンドを実行します。

$ npm install rabbit.js

リクエスト・レスポンス型(REQ/REP)

 リクエストとレスポンスというメッセージパターンの場合はREQ(クライアント)とREP(サーバ)ソケットを使います。サーバは複数のクライアントからのリクエストを同時に受け付けることができます(リスト15, 16)。

[リスト15]サーバ側(REP)
# localhostのAMQPサーバに接続
context = require('rabbit.js').createContext 'amqp://localhost'

# コンテキストの準備完了を待つ
context.on 'ready', ->
    # サーバソケットを作成
    rep = context.socket 'REP'
    # dataイベントで受け取るデータはUTF-8文字列であるとみなす。
    # setEncoding()を行わない場合はBufferオブジェクトが来る。
    rep.setEncoding 'utf-8'

    # クライアントからリクエストが来た時に呼ばれる
    rep.on 'data', (data) ->
        console.log "received: #{data}"
        # data文字列を逆にして送信
        rep.write data.split('').reverse().join(''), 'utf-8'

    # reverseキューに接続する
    rep.connect 'reverse'
[リスト16]クライアント側(REQ)
# localhostのAMQPサーバに接続
context = require('rabbit.js').createContext 'amqp://localhost'

# コンテキストの準備完了を待つ
context.on 'ready', ->
    # クライアントソケットを作成
    req = context.socket 'REQ'
    # dataイベントで受け取るデータはUTF-8文字列であるとみなす。
    # setEncoding()を行わない場合はBufferオブジェクトとして来る。
    req.setEncoding 'utf-8'

    # reverseキューに接続する
    req.connect 'reverse', ->
        # サーバから結果が来た時に呼ばれる
        req.on 'data', (data) ->
            console.log "result: #{data}"

        # サーバにUTF-8文字列を送信する
        req.write 'helloworld', 'utf-8'

 サーバとクライアントを実行した結果はリスト17_1、17_2です。

[リスト17_1]サーバ側実行結果
received: helloworld
[リスト17_2]クライアント側実行結果
result: dlrowolleh

 ØMQとは違ってRabbitMQには自動再接続の機能はありませんが、ブローカーがキューを処理してくれるため、サーバー側のプログラムよりクライアント側のプログラムを先に実行することができます。

配信・購読型(PUB/SUB)

 配信(PUB)と購読(SUB)というメッセージパターンの場合はリスト18、19のように書きます。1つのPUBソケットは複数のSUBソケットからの接続を同時に処理できます。

[リスト18]配信側(PUB)
# localhostのAMQPサーバに接続
context = require('rabbit.js').createContext 'amqp://localhost'

# コンテキストの準備完了を待つ
context.on 'ready', ->
    # 配信用ソケットを作成
    pub = context.socket 'PUB'
    # eventsエクスチェンジに接続する、エクスチェンジが存在しない場合は自動的に作られる。
    pub.connect 'events', ->
        # 1秒ごとにUTF-8文字列を送信
        setInterval ->
            pub.write 'TICKER GOOG 606.80', 'utf-8'
        , 1000

        # 5秒ごとにUTF-8文字列を送信
        setInterval ->
            pub.write 'NEWS 速報', 'utf-8'
        , 5000
[リスト19]購読側(SUB)
# localhostのAMQPサーバに接続
context = require('rabbit.js').createContext 'amqp://localhost'

# コンテキストの準備完了を待つ
context.on 'ready', ->
    # 購読用ソケットを作成
    sub = context.socket 'SUB'
    # dataイベントで受け取るデータはUTF-8文字列であるとみなす。
    # setEncoding()を行わない場合はBufferオブジェクトが来る。
    sub.setEncoding 'utf-8'

    # データが来た時に呼ばれるコールバック関数を登録
    sub.on 'data', (data) ->
        console.log "受信: #{data}"

    # eventsエクスチェンジに接続する、エクスチェンジが存在しない場合は自動的に作られる。
    sub.connect 'events'

 配信側と購読側を実行すると、購読側にリスト20のように出力されます。

[リスト20]購読側実行結果
受信: TICKER GOOG 606.80
受信: TICKER GOOG 606.80
受信: TICKER GOOG 606.80
受信: TICKER GOOG 606.80
受信: TICKER GOOG 606.80
受信: NEWS 速報
受信: TICKER GOOG 606.80
受信: TICKER GOOG 606.80

 これらのライブラリをうまく利用して、イベント駆動型のプログラムを見通しよく作れるよう工夫してみてください。次回も引き続き、CoffeeScriptでNode.jsアプリケーションを開発する際によく使われる実用的な開発手法を紹介します。

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
CoffeeScriptによるモダンなWebアプリケーション開発連載記事一覧

もっと読む

この記事の著者

飯塚 直(イイヅカ ナオ)

1984年東京都生まれ。 高校時代に趣味でPerlやJavaを使ってプログラミングを始める。 慶応大学湘南藤沢キャンパス卒業後、共同通信社にてニュースサイトの開発などを担当。 その後、面白法人カヤックにてソーシャルゲームの開発などを手がける。 2012年現在、カヤックを退社し個人として活動し...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/6481 2012/04/19 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング