本エントリの内容を最後まで読んでいただくことで、StackStormを用いた開発・運用に関する一通りの知識を習得できるようになると思います。
StackStormの機能拡張
前回の記事の内部アーキテクチャにおいて、外部のサービスやシステムと連携して、処理やワークフローを実行する仕組みを解説しました。これらを実現するための内部コンポーネントであるセンサ、トリガ、アクションは、Packという単位で管理されていたことを思い出してください。
StackStormでは、これらのPackを動的に追加(削除)できる機能によって、柔軟な機能追加(削除)が行えます。またStackStorm Exchangeから、さまざまなサービス・システム向けのPackが提供されており、これらを活用することで、運用をする上で必要となるおおよその機能はカバーできると思います。
しかし、StackStorm Exchangeから取得できるPackに欲しい機能が実装されていなかったり、すでに利用している運用自動化の資産を活用したかったりする場合があるかもしれません。
ここでは、そうした運用のための開発を行うユーザ向けに、PackおよびPackを構成するセンサ、トリガ、アクションを実装する方法を解説します。
例として、ローカルマシンのファイルシステムのディレクトリの変更を監視するセンサ(DirectorySensor)と、ファイル更新が行われた際に引かれるトリガ(changed_file)、および入力されたパラメータをログに吐き出すアクション(output_context)を実装します。
あまり面白みのない仕組みですが、これらをどのように実装するのかを理解するにはちょうどよいと思います。
Packを作る
まずはst2sdkを使ってPackの雛形を作成します。以下の手順でst2sdkをインストールし、オリジナルのPack(mypack)を作成します。
$ sudo apt-get install python-pip ## pip コマンドをインストール $ sudo pip install st2sdk ## st2sdk をインストール $ st2sdk bootstrap mypack ## pack の雛形を作成
st2sdkのサブコマンドbootstrapによって、Pack自体の定義ファイルpack.confやセンサやアクションが参照するパラメータを保存する設定ファイルconfig.yamlなど、最低限必要のファイルが自動生成されます。
なお、以降で解説するコードを含むmypack全体を以下のリポジトリで公開しています。
SensorとTriggerを作る
続いてセンサとトリガを作成します。コードを見る前にセンサに関するソフトウェアの構造について把握したいと思います。以下の図はセンサのソフトウェアアーキテクチャを表します。
上部のmypack.DirectorySensorと書かれた部分がユーザが実装する部分で、外部システムのイベントを検知する処理を実装します。SensorServiceはStackStormの機能で、センサがトリガやデータストア、設定ファイルなどにアクセスするための仕組みを提供しています。具体的には、MongoDBで管理されるデータストアへのアクセスや、トリガを引いてRabbitMQを通してイベント通知を送るためのインターフェースを提供します。
また、StackStormはセンサごとにプロセスを立ち上げます。従って、ユーザが定義したセンサがクラッシュした場合でも、他のセンサやStackStormサービスに対して影響を及ぼすことはありません。
ここから、センサを作成するコードを見ていきます。センサは、メタデータファイルとソースコードから構成されています。以下は冒頭のセンサのメタデータファイルの全文になります。
---
class_name: "DirectorySensor"
entry_point: "directory_sensor.py"
description: "An example sensor to know how to implement it"
poll_interval: 30
trigger_types:
- name: "changed_file"
description: "Trigger which indicates a change of trigger"
payload_schema:
type: "object"
properties:
paths:
type: "string"
time:
type: "string"
status:
type: "string"
trigger_types以下で、このセンサに紐付くトリガ(changed_file)を指定しています。その際、このトリガがどういった出力パラメータをアクションに渡すかを規定したフォーマットpayload_schemaを定義できます。センサはトリガを引く際、ここで規定したパラメータを設定します。ユーザがRuleを記述する際、ここで規定されているフォーマットに基づいて、パラメータの変換設定を行います。
entry_pointではセンサの実装ファイルを指定します。ここで指定するファイルのパスは、mypackのセンサに関連するファイルを格納するディレクトリ/opt/stacksotm/packs/mypack/sensorsからの相対パスで指定します。またclass_nameで後述するセンサを実装したクラスを指定します。
それでは、センサの実装を見ていきます。以下はソースコードdirectory_sensor.pyの抜粋です。
from st2reactor.sensor.base import PollingSensor
class DirectorySensor(PollingSensor):
def setup(self):
self._logger = self._sensor_service.get_logger(__name__)
# initialize directory info
self._cached_dirinfo = self._get_dirinfo()
def poll(self):
current_dirinfo = self._get_dirinfo()
# notify deleted files
for path in (set(self._cached_dirinfo) - set(current_dirinfo)):
self._dispatch_trigger(path, self._cached_dirinfo[path], 'deleted')
# notify created files
for path in (set(current_dirinfo) - set(self._cached_dirinfo)):
self._dispatch_trigger(path, current_dirinfo[path], 'created')
# notify modified files
for path in [x for x in (set(self._cached_dirinfo) & set(current_dirinfo))
if self._cached_dirinfo[x] != current_dirinfo[x]]:
self._dispatch_trigger(path, current_dirinfo[path], 'modified')
# update cache data
self._cached_dirinfo = current_dirinfo
……(略)……
def _get_dirinfo(self):
dirinfo = {}
for target in self._get_sensor_config('directories'):
self._do_get_dirinfo(target, dirinfo)
return dirinfo
……(略)……
def _dispatch_trigger(self, path, time, status):
payload = {
'path': path,
'time': time,
'status': status,
}
self._sensor_service.dispatch(trigger="mypack.changed_file", payload)
センサの実装は大きく2種類あり、メールやWebプッシュ通知などの仕組みによって外部システムのイベントを受動的に受け取るパッシブセンサSensorか、センサ自体が外部システムを参照して能動的にイベントを取得しに行くアクティブセンサPollingSensorのいずれかになります。
今回実装するDirectorySensorは定期的にファイルシステムを確認して差分を確認するため、アクティブセンサPollingSensorを継承します。逆にinotifyなどの仕組みによって、ファイルシステムから送られてくる変更通知を待ち受けるセンサを実装する場合には、パッシブセンサSensorを継承すると良いです。
センサは、プロセスが実行された場合や初期化された場合など、いくつかのコールバックを受け取ることができます。ここでは、センサが初期化された後に1度だけ呼ばれるコールバックメソッドsetupと、メタデータファイルのpoll_intervalパラメータで指定した間隔(単位は秒)ごとに呼び出されるpollメソッドに処理を記述します。
処理の中身は、まず初期化処理においてプライベートメソッド_get_dirinfoを呼び出し、以下に示すmypackの設定ファイルで指定したディレクトリ配下の各ファイルの更新時間を取得する処理_do_get_dirinfoを実行し、結果を返します(_do_get_dirinfoのコードの説明は割愛します)。
### /opt/stackstorm/packs/mypack/config.yaml
---
sensor:
directories:
- /opt/stackstorm
続いて定期的に呼び出されるpollにおいて、同じように_get_dirinfoを呼び出し、前回実行時との差分(作成、削除、更新されたファイル)を取得し、差分があった場合に_dispatch_triggerを呼び出し、トリガ(mypack.changed_file)を引きます。その際、メタデータファイルのpayload_schemaで規定したフォーマットに従って、アクションに渡すパラメータを設定します。
こうして通知されたイベントがMQを経由し、st2rulesengineサービスに渡ります。そして、当該トリガに関連付けられたルールが存在する場合には、それに紐付くアクションがワーカノードで実行されます。
Actionを作る
最後にアクション(output_context)を作成します。アクションもメタデータファイルとソースコードの2つから構成されます。以下にメタデータファイルの全文を記載します。
---
name: output_context
pack: mypack
runner_type: run-python
entry_point: output_context.py
description: 'An example action to know how it works'
enabled: true
parameters:
context:
type: string
default: ''
メタデータファイルの構成は、入門編の最後で解説したWorkflowとほぼ同じです。ただWorkFlowと異なり、このアクションはPythonスクリプトで記述するためにrunner_typeにrun-pythonを指定します(他にもさまざまな形式のアクションを指定できます)。
続いてアクションのソースコードの全文を以下に記載します。
import os
from st2actions.runners.pythonrunner import Action
class OutputContext(Action):
def run(self, context):
output_path = self.config.get('log', None)
if output_path:
try:
with open(output_path, 'a') as file:
file.write(context + '\n')
except IOError as err:
return (False, "IOError occurred (%s)" % (err))
return (True, "This processing succeeded.")
else:
return (False, "The output filepath is invalid.")
当該アクションを実装したクラスOutputContextは、StackStormが用意したベースクラスActionを継承しています。内部では、Packの設定やデータストア、ログにアクセスするための仕組みを提供するActionServiceオブジェクトへの参照を持っており、これらと連携したアクションを簡単に記述することができます。
今回実装するアクションOutputContextでは、実行時に呼び出されるコールバックメソッドrunをオーバーライドしています。メソッドrunでは、メタデータファイルのparametersで指定した値contextを仮引数で受け取ることができます。runの内部では、入力パラメータcontextで受けた値を設定ファイルで指定したログファイルに書き出す処理を行っています。
動作確認
それでは、ここまでに作成したセンサ、トリガ、そしてアクションを動かします。
まずはmypackをデプロイします。mypackのリポジトリを取得してmakeコマンドを実行します。
vagrant@st2-node:~$ git clone https://github.com/userlocalhost/st2-pack-example.git vagrant@st2-node:~$ cd st2-pack-example vagrant@st2-node:~/st2-pack-example$ make
ここでは、上記で作成したmypackを/opt/stackstorm/packs/にデプロイし、mypackの実行環境を構築するためにアクション(packs.setup_virtualenv)を実行します。続いて、st2ctrl reloadを実行することで、デプロイしたmypackの設定ファイルを読み込み、センサ(DirectorySensor)を起動させます。
すべての処理が完了したら、以下のコマンドで正常にセンサ、トリガ、アクションが登録されたことを確認してください。
また、以下のとおりセンサプロセスが正常に動いていることも確認してください。
ここまででmypackを動かす、すべての準備が整いましたので、実際にこれらを動かしてみます。以下のようにDirectorySensorが監視するディレクトリに適当なファイルを作成してください。
vagrant@st2-node:~$ sudo touch /opt/stackstorm/packs/hoge
暫くするとmypackの設定ファイル/opt/stackstorm/packs/mypack/config.yamlのlogパラメータで指定したパスにトリガから送られた値が出力されます。出力先のファイルパスの確認と併せて、出力結果を確認します。
