- 前回の記事:「データプラットフォーム「IRIS」を、まずは動かしてみよう!」
MQTTプロトコル
MQTTはMessage Queuing Telemetry Transportの略で、TCPベースのプロトコルです。IoTシステムで利用される理由としては、
- 軽量であり、比較的小さいサイズのデータを高頻度で送受信するのに適している
- 電力消費量に制約のあるデバイスでも動作できる
- Pub/Sub型の通信により、必要なデバイスやアプリケーションが、必要なデータだけを送受信することができる
などが挙げられます。
ここで、MQTTで規定されているPub/Sub型の通信について簡単に説明します。
Pub/Sub型の通信では、次の3つの役割を持つコンポーネントがあります。
- Publisher:データを送信(Publish)するコンポーネント。送信するデータには、Topic属性をつける。Topicの値は、Subscriberが受信するデータを選択(フィルター)するために使用される。
- Subscriber:データを受信するコンポーネント。受信したいTopicをあらかじめBrokerに登録(Subscribe)しておく。
- Broker:Publisherから送信されたデータを、そのデータについているTopicをSubscribeしているSubscriberに送る機能を持つ。
以上の概要を次の図に示します。
図の左側、PublisherはTopicとともにデータをBrokerに送信します。図の右側のSubscriberは、BrokerにTopicを登録しておき、実際にそのTopicに合致するデータがPublishされたら、Brokerからそのデータを受信します。
図に示されている通り、Topicは「/」で区切られた階層構造を持っています。Publisherは、DevA/Tempのように送信するデータのTopicを指定してPublishします。Subscriberは、受信したいTopicをDevA/Tempのように指定してSubscribeすることもできますし、DevB/#のように、ワイルドカード(#)を使用して、DevB以下すべてのTopic(図の例では、DevB/Temp、DevB/Speed)のデータをSubscribeすることもできます。
MQTT Brokerは、オープンソースでいくつかのソフトウェアが公開されています。今回の連載では、Eclipse Mosquittoを利用します。Eclipse Mosquittoはdockerイメージでも提供されているので、今回の構成ではeclipse-mosquittoというイメージをpullして使用するように定義しています。定義は、docker-compose.ymlのmqttbrokerセクションに記述しているので確認してみてください。
IRISの環境設定
では、いよいよIRISのセットアップをしましょう。前回でgit pullを行ったディレクトリ(本稿ではDriveDemo)に移動してください。そして、前回で実際に動かしてみた方は、コンテナの停止、もしくは削除から始めてください。
コンテナを停止する場合:
>docker-compose -f docker-compose-demo.yml stop
コンテナを削除する場合:
>docker-compose -f docker-compose-demo.yml down
次に今回の実習用のコンテナを起動します。
>docker-compose up –-build -d
このコマンドにより、IRISの実習用のイメージがビルドされ、iris-drive-playという名前のコンテナとして立ち上がります。このイメージは、InterSystemsがDocker Hubで提供しているInterSystems IRIS Data Platform, Community Editionをベースにして、実習に必要なOpenJDKの環境を組み込んだものです。
全体図
今回はIRISでいくつかの設定を行います。まず全体図を示します。
もし体験を進める途中で何をしているのか分かりにくくなった場合は、この図に戻って全体を確認してもらえればと思います。
管理ポータルへログイン
前回紹介した管理ポータルにログインします。
http://localhost:52003/csp/sys/UtilHome.csp
今回は、パスワードが初期状態になっているので、ユーザ名:「SuperUser」、パスワード:「SYS」でログインします。
ログインすると、パスワードを変更する画面が表示されるので、パスワードを変更してください。本稿では、driveuserを使用することにします。
ネームスペースの作成
管理ポータルにログインできたら、最初にネームスペースを作成します。IRISでは、ネームスペースという単位で、データベース、プログラム、インターオペラビリティ機能などを管理します。ここでは、今回の実習で使用するネームスペースを1つ作成します。以下の図のとおり、管理ポータルのメニューから、[システム管理]→[構成]→[システム構成]→[ネームスペース]の順に選択して[実行]を押してください。
そうすると、存在するネームスペースの一覧が表示されます。画面の上部にある[新規ネームスペース作成]をクリックします(次図)。
表示された画面で、ネームスペース名:「drive」と入力し、[グローバルのための既存のデータベースを選択]欄右側の[新規データベース作成]をクリックします(次図)。
データベースの名前、ディレクトリの2か所にdriveと入力します(次図)。
[完了]ボタンを押すと元の画面に戻ります。今度は[ルーチンのための既存のデータベースを選択]でドロップボックスをクリックし、今作成した「DRIVE」を選択して、最後に画面上部の[保存]を押します(次図)。
以上の操作により、ネームスペースの環境構築が行われます(数秒)。最後に、画面最下部の[閉じる]を押してください。これでネームスペースの作成は完了です。
インターオペラビリティの設定
ネームスペースが作成できたら、次にインターオペラビリティの設定に移ります。設定作業が続きますが、もう少しご辛抱ください。
ネームスペース作成の画面から、左上の[システム]をクリックし、管理ポータルのメイン画面に戻ります。そこから、[Interoperability]をクリックし、「プロダクションに使用可能なネームスペース」に、先ほど作成した「DRIVE」を選択します(次図)。
そこで[構成]→[認証情報]を選択します。この画面は、IRISが外部システムとやり取りする際に必要な認証情報を事前に定義するためのものです。今回は、後で紹介するJavaビジネスホストを設定するための認証情報をここで入力します。次図のように、ID:「SuperUser」、ユーザ名:「SuperUser」、パスワード:「driveuser(管理ポータルログイン時に設定したパスワード)」を入力し、保存ボタンを押します。
保存が完了したら、左上メニューの[Interoperability]をクリックします。
次は、今回MQTTプロトコルのペイロード(データの本体)で使用するXMLのスキーマ定義をIRISにセットアップします。次の図のように、[Interoperability]→[相互運用]→[XML]→[XMLスキーマ構造]の順にクリックします。
そこで[インポート]ボタンを押すとファイル選択ダイアログが表示されるので、「/projects/pkg/mqtt_schema_driverecord.xsd」を選び[OK]を押します。すると次図のダイアログが出るので、赤で示したマークをクリックします。
「message」をクリックすると、XMLスキーマが表示されます。スキーマが階層構造になっています。項目3の「DriveRecord」をクリックすると展開され、次の図のようなタグが表示されることが確認できます。
プロダクションの作成
プロダクションとは、IRISのインターオペラビリティ機能の実行環境です。プロダクションにはインターオペラビリティ機能を実現するためのコンポーネントが含まれ、プロダクションを起動することによって、各コンポーネントに対しOSの常駐プロセスが割り当てられます。
プロダクションに含まれるコンポーネントには以下のような種類があります。
- ビジネス・サービス:外部システムからデータを受け取るなどのイベントを検知し、一連のインターオペラビリティ機能を開始させるコンポーネントです。例えば、MQTT Brokerからデータが送られてくる場合、ビジネス・サービスがそのデータを受信し、他のコンポーネントへのメッセージを生成します。また、RDBMSに一定間隔でSELECT文を発行したり、特定のディレクトリにファイルが置かれることをポーリングしたりする機能も持ちます。
- ビジネス・プロセス:ビジネス・サービスなどプロダクション内の他のコンポーネントからメッセージを受け取り、定義されたビジネスロジックを実行するコンポーネントです。ビジネスロジックの定義には、BPL(Business Process Language)を用います。IRISには、BPLをグラフィカルに編集する機能があります。
- ビジネス・オペレーション:プロダクション内の他のコンポーネントからメッセージを受け取り、1つの機能を実行します。例えば、IRIS内部のデータベースにアクセスしたり、外部システムにデータを送信したりするなどの機能を実行します。
では、今回のアプリケーションを実現するためのプロダクションを作成してみましょう。管理ポータル上部の[ホーム]をクリックしホーム画面に戻り、[Interoperability]→[構成]→[プロダクション]をクリックしてください(次図)。
そうするとプロダクション構成画面が表示されるので、次の図のように[新規]ボタンをクリックします。
ダイアログが表示されたら、パッケージ:「DriveDemo」、プロダクション名:「Production」を入力し[OK]を押します(次図)。
これで、プロダクションが新しく作成されました。なお、プロダクションはIRISでクラスとして定義され、先ほど入力したDriveDemo.Productionがクラス名になります。
Javaゲートウェイ
ではいよいよ、プロダクションにコンポーネントを追加していきます。まずは今回のテーマであるMQTTプロトコルでデータを受信するビジネス・サービスを作成します。
IRISはSOAPやxDBCなどさまざまなプロトコルやデータ形式をアダプタと呼ばれるクラスによってサポートしています。しかし、現時点でMQTTアダプタは用意されていません(現在MQTTアダプタを開発中)。
MQTTに限らず、世の中には数多くの標準プロトコルが存在し、また、場合によっては標準ではない通信手順でシステム間連携を行う場面があります。このような状況に対応するため、IRISはJavaゲートウェイという仕組みを開発者の皆さんに提供しています。IRISのビジネス・サービスで定められたメソッドをJavaでプログラムすることによって、ビジネス・サービスの機能を拡張することができます。
Javaコード
MQTTのデータを受信するためのJavaのプログラムは、「IRIS-MQTT/projects/srcs/java/src/com/intersystems/drivedemo/MqttBS.java」にあります。以下では、プログラムのキーとなる部分について紹介します。
ライブラリのインポート
IRISのビジネス・サービスとしての機能を継承するために次のクラスをインポートします。
import com.intersystems.gateway.bh.BusinessService; import com.intersystems.gateway.bh.Production; import com.intersystems.gateway.bh.Production.Severity;
MQTTの通信を行うためにEclipse Pahoのクラスをインポートします。
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
なお、これらのクラスの実体は、「IRIS-MQTT/projects/srcs/java/lib」にJARファイルとして置いてあります。
クラス定義
クラス定義では、IRISのBusinessServiceインターフェース、PahoのMqttCallbackインターフェースを実装するよう宣言します。
public class MqttBS implements BusinessService, MqttCallback {
SETTINGS変数には、IRISのプロダクションから設定可能な項目を列挙します。
public static final String SETTINGS = "MQTTBroker,MQTTClientName,MQTTTopicRoot,LogFile";
メソッド
ビジネス・サービス起動時に呼び出されるOnInit()メソッドです。
@Override public boolean OnInit(Production prod) throws Exception { // (1) try { production=prod; mqttBroker = production.GetSetting("MQTTBroker"); // (2) mqttClientName = production.GetSetting("MQTTClientName"); mqttTopicRoot = production.GetSetting("MQTTTopicRoot"); // (3) mqttClient = new MqttClient(mqttBroker, mqttClientName); // (4) mqttConnOpts = new MqttConnectOptions(); mqttConnOpts.setCleanSession(true); mqttClient.connect(mqttConnOpts); // (5) production.LogMessage("Successfully connected to MQTT broker: "+mqttBroker,Severity.INFO); String[] myTopics={this.mqttTopicRoot}; mqttClient.subscribe(myTopics); // (6) production.LogMessage("Subscribed to MQTT topic: "+mqttTopicRoot,Severity.TRACE); mqttClient.setCallback(this); // (7) production.LogMessage("Successfully connected to MQTT broker: "+mqttBroker,Severity.INFO); return true; } catch (Exception e) { //System.out.println(e.toString()); production.LogMessage(e.toString(), Severity.ERROR); production.SetStatus(Production.Status.ERROR); } return false; }
- (1)OnInitの引数にはProductionクラスの変数がIRISから渡されます。この変数からプロダクションの設定値などにアクセスできます。
- (2)MQTT Brokerへの接続URLを取得します。この値は、IRISのプロダクションから設定します。
- (3)SubscribeするTopicを指定します。この値は、IRISのプロダクションから設定します。
- (4)MqttClientクラスのインスタンスを作成します。
- (5)MQTT Brokerに接続します。
- (6)MQTT BrokerにSubscribeするTopicを登録します。
- (7)コールバックを処理するインスタンスにthisを指定します。
MQTT Brokerからデータが到着した時に呼び出されるコールバックハンドラ messageArrived()です。
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { // (1) production.LogMessage("java, messageArrived", Severity.TRACE); production.LogMessage("java, topic: "+topic+"; Message:"+message.toString(),Production.Severity.TRACE); Timestamp timestamp = new Timestamp(System.currentTimeMillis()); Instant ts = timestamp.toInstant(); String xmlMessage="<message><type>"+topic+"</type><timestamp>"+ts+"</timestamp><content>"+message.toString()+"</content></message>"; // (2) production.LogMessage("java, xmlMessage:"+xmlMessage,Severity.TRACE); production.SendRequest(xmlMessage); // (3) }
- (1)MQTT Brokerから到着したデータが引数messageに、Topicが引数topicに格納されて呼び出されます。
- (2)受信したデータのTopic、時刻、内容をXMLのタグ付けします。このXMLの形式は、先ほどIRISに読み込ませたXMLスキーマで定義したものに合わせてあります。
- (3)XML形式のデータをリクエストメッセージとして送信します。このリクエストメッセージが「どこに」送られるかなどは後に説明します。
このクラスをコンパイルした.classは、「IRIS-MQTT/projects/srcs/java/lib/IRIS-MQTT-Client.jar」にアーカイブします。
ビジネス・サービスの作成
作成したJavaのプログラムをIRISのビジネス・サービスとして登録します。管理ポータルのホーム画面から、[Interoperability]→[構成]→[プロダクション]をクリックし、画面左「サービス」と書かれた横の+マークをクリックします(次図)。
次の図のように、「サービスクラス」から「EnsLib.JavaGateway.Initiator」を選択し、「有効にする」にチェックを入れ、[OK]ボタンを押します。
そうすると、ビジネス・サービスに「EnsLib.JavaGateway.Initiator」が追加されます。そのコンポーネントをクリックし、画面右側「設定」から「追加の設定」を展開、「クラスパス」に「/projects/srcs/java/lib/org.eclipse.paho.client.mqttv3-1.2.0.jar」と入力し[適用]を押します(次図)。
次に、[Interoperability]→[構築]→[Javaビジネス・ホスト]をクリックします(次図)。
その画面の[アクティブなプロダクション]から「DriveDemo.Production」を選び、「プロダクションを開始しますか?」とダイアログで聞かれたら[OK]を押します。
次に、Javaビジネス・ホストの設定を行います。図のとおり、Jarファイルのパスに「/projects/srcs/java/lib/IRIS-MQTT-Client.jar」、Javaクラスの名前に「com.intersystems.drivedemo.MqttBS(ビジネス・サービス)」、デフォルト認証情報に「SuperUser」、インカミング・データのフォーマットに「XML」を指定し「生成」ボタンをクリックします。
以上で、Javaで記述したビジネス・サービスが作成できました。
ビジネス・サービスをプロダクションに追加
設定もようやく最終段階になります。先ほど作成したビジネス・サービスをプロダクションに追加します。
管理ポータルのホーム画面から、[Interoperability]→[構成]→[プロダクション]と選択し、プロダクションの管理画面に移動します。そこで、先ほどと同様、「サービス」と書かれた横の+マークをクリックします。
ダイアログで、次の図のように、サービスクラスに 「JBH.Com.Intersystems.Drivedemo.MqttBS」を選択、サービス名に「MQTTIn」と入力、「有効にする」にチェックを入れ[OK]を押します。
また、同様に「プロセス」と書かれた横の+マークをクリックし、次の図のとおり、ビジネス・プロセス・クラスに「EnsLib.MsgRouter.VDoc.RoutingEngine」を選択し、ビジネス・プロセス名に「RoutingEngine」と入力して、「ルールの自動作成」にチェック。新規ルールパッケージに「DriveDemo」と入力し、「有効にする」にチェックを入れて[OK]を押します。警告が出ますが、ここではOKを押してください。ビジネス・プロセスについては次回以降で解説します。
最後にMQTTInの設定を行います。ターゲット構成名に「RoutingProcess」を選択して、Docスキーマ・カテゴリに「mqtt_schema_driverecord」を選択し、MQTTBrokerに「tcp://mqttbroker:1883」と入力。MQTTClientNameに「IRISClient」と入力し、MQTTTopicRootに「DriveDemo/#」と入力して、[適用]をクリックします(次図)。
ここで、MQTTBroker、MQTTClientName、MQTTTopicRootは、Javaのプログラムに渡される項目であることに注意してください。
そして、設定を有効にするために、画面上部中程の[停止する]を押してから[開始する]を押して、プロダクションを再起動してください(次図)。
以上で、MQTTプロトコルでデータを受信するプロダクションの設定が完了しました。
実行
ここまで設定・作成したアプリケーションを実行してみましょう。
前回と同様、DataGeneratorディレクトリに移動し、車載器データを生成するプログラムを動かします。
>cd DataGenerator Linux, Mac OSの場合 >./run.sh Windowsの場合 >run.bat
管理ポータルのホームから[Interoperatility]→[表示]→[メッセージ]とクリックすると、メッセージの一覧が表示されます。そのうちの一つを確認してみてください。次の図のようなメッセージが流れていることが分かります。
今回作成したのは、MQTTプロトコルで受信したデータをXMLのメッセージとしてJavaビジネス・サービスがメッセージに変換し、それを仮に作成したRoutingProcess(ビジネス・プロセス)に送信するところまでですが、その様子が表示されています。
まとめ
今回は、IoTで利用されるMQTTプロトコルをIRISから利用する手順をご紹介しました。IRISのインターオペラビリティ機能を使用するために、プロダクションと呼ばれる実行環境を作成し、そこにビジネス・サービスやビジネス・プロセスといったコンポーネントを追加することで、機能を実装していく流れを説明しました。
また、MQTT Brokerとの接続のためにJavaのプログラムを書き、それをビジネス・サービスに組み込む方法をご紹介しました。
次回は、Visual Studio Codeを使ってIRISにクラスを定義する方法を紹介する予定です。
参考
インターシステムズでは、開発者の皆様にIRISを知っていただくために、いろいろな機能を紹介する短い動画を作成しています(日本語字幕付き)。
インターオペラビリティ機能は以下のページで説明されています。ぜひご覧ください。