ステップ2:.NET送信アプリの作成
Service Bus Topicとサブスクリプションが作成されたので、Topicに対してデータ送信する簡単な.NETプログラムを作成します。
Service BusのAMQPサポートは、Service Bus .NETクライアントライブラリの最新版で利用可能です。これは、http://nuget.org/packages/WindowsAzure.ServiceBus/で、NuGetを通じて取得できます。Version 2.1.0以上が必要になります。“Install Package WindowsAzure.ServiceBus”と入力すれば、ダウンロードして.NETアプリケーションへ追加できます。
以下のコードは、簡単な.NETコンソールアプリケーションで、ユーザがデータを入力したら、アプリはService Bus .NET APIを使用してユーザが入力したデータを、上記で作成した“scottmessages”Service Bus Topicへ投稿します。
using System; using System.Configuration; using Microsoft.ServiceBus.Messaging; namespace SendToScott { class Program { static void Main(string[] args) { string connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"]; TopicClient topicClient = TopicClient.CreateFromConnectionString(connectionString, "scottmessages"); Console.WriteLine("Type messages you wish to post to the Topic:"); while (true) { Console.Write("> "); string messageText = Console.ReadLine(); topicClient.Send(new BrokeredMessage(messageText)); } } } }
上記のコードは.NETのConnectionManagerクラスを使用して、app.configファイルから構成の設定を引き出します。
私はこの方法を使用して、Service Bus Topicへの接続文字列を取得しています(ハードコードしないように)。
以下は、これを指定したApp.configファイルです。
<?xml version="1.0" encoding="utf-8" ?> <configuration> <startup> <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" /> </startup> <appSettings> <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://scottgu-ns.servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=sSDdaewGUo3/wsaewtjhELlCi1y3SRwjFMX01tz2c/AXw=;TransportType=Amqp" /> </appSettings> </configuration>
Topicを選択して、ポータル下部にあるコマンドバーの“Access Key”ボタンをクリックすると、Windows AzureポータルからService Bus Topicへの接続文字列を取得できます。AMQPを使用するための.NETクライアントライブラリ設定で、“;TransportType=Amqp” を接続文字列に追加していますのでご注意ください。
コンソールアプリを実行
.NETコンソールアプリを実行してみましょう。F5を押すと、コンソールアプリが生成されて、Topicに送信するデータが入力できます。入力のサンプルは以下のとおりです。
上記で入力された各データは、Service Bus Topicに投稿されたので、処理のために設定した3つのサブスクリプションにそれぞれデータのコピーが耐久的にキューに入ります。
ステップ3:Javaアプリリスナーを作成
では、サブスクリプションに接続してデータ処理するJavaアプリを作成しましょう。
Javaのデータ通信標準APIは、JMS(Java Message Service)です。JMSは、基礎となる転送については何も特定していないので、異なるJMS製品がそれぞれのデータ通信ブローカーと通信する異なるプロトコルを使用します。私は、基礎のプロトコルとしてAMQP 1.0を使用するApacheの標準のJMSプロバイダを使用します。このライブラリを使用して、Windows Azure Service Busがオープンの標準JMSプロバイダーになります!
Apache AMQPプロバイダーは、http://people.apache.org/~rgodfrey/qpid-java-amqp-1-0-client-jms.htmlで取得できます。配布アーカイブから以下の4つのJARファイルを、アプリケーションの構築と実行時には、Java CLASSPATHに追加する必要があります。
- geronimo-jms_1.1_spec-1.0.jar
- qpid-amqp-1-0-client-[version].jar
- qpid-amqp-1-0-client-jms-[version].jar
- qpid-amqp-1-0-common-[version].jar
そして、以下のJavaコードを書いて、標準のJMSデータ通信APIを使用してService Busサブスクリプションに接続し、その中でデータ処理を行います。
// ReceiveScottsMessages.java import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import java.util.Hashtable; public class ReceiveScottsMessages implements MessageListener { public static void main(String[] args) { try { Hashtable<String, String> env = new Hashtable<String, String>(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory"); env.put(Context.PROVIDER_URL, "servicebus.properties"); Context context = new InitialContext(env); ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF"); Topic topic = (Topic) context.lookup("EntityName"); Connection connection = cf.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber subscriber = session.createDurableSubscriber(topic, "java"); subscriber.setMessageListener(new ReceiveScottsMessages()); connection.start(); System.out.println("Receiving messages. Press enter to stop."); System.in.read(); System.out.println("Shutting down."); connection.stop(); subscriber.close(); session.close(); connection.close(); } catch (Exception e) { System.err.println("Caught exception. Exiting."); System.exit(1); } } @Override public void onMessage(Message message) { try { System.out.println("Message From Scott > " + ((TextMessage) message).getText()); } catch (JMSException e) { System.err.println("Caught exception receiving message: " + e); } } }
Apache JMSプロバイダーは、簡易ファイルベースのJNDIプロバイダーを使用して、接続の詳細とデータ通信エンティティの論理名と物理名のマッピングを含むJMS “Administered Objects”を構成しています。以下は、Windows Azure Service Bus Topicに対する接続文字列の詳細を埋め込むのに使用しているservicebus.propertiesファイルです。
connectionfactory.SBCF = amqps://owner:sSDdaYGUo3%2FwpewtjhELlCi1y4SSwjFGX01tz2c%2FAXw%3D@scottgu-ns.servicebus.windows.net topic.EntityName = scottmessages
このプロパティファイルは、Service Bus接続文字列からの構成要素を含む“SBCF”と呼ばれるConnectionFactoryを定義しています。
形式は以下のとおりです。
amqps://[username]:[password]@[namespace].servicebus.windows.net
上記の形式では、[username]は発行者名と対応し、[password]はURLエンコードされた発行者キーになります。
発行者キーは、手動でURLエンコードしなければなりません。便利なURLエンコーディングユーティリティは、http://www.w3schools.com/tags/ref_urlencode.aspにあります。
Javaアプリを実行
Javaアプリを実行すると、Service Bus Topic上の“Java”サブスクリプションに接続し、以下の出力を生成します。
Receiving messages. Press enter to stop. Message From Scott > Red Shirts are cool Message From Scott > Cross-platform messaging is so simple with AMQP and Service Bus Message From Scott > Windows Azure Rocks! Shutting down.
.NETを使用してTopicへ送信したデータがシームレスにJavaアプリから消費されているところに注目してください!
SpringやJEEなどの人気のJavaフレームワークは、JMSを使用して異なるデータ通信システムを統合するので、これらのフレームワークを使用してコンポーネントを書いてWindows Service Busを装備したデータ通信システムにし、シームレスに他の言語やフレームワークとも相互運用および統合することができます。