5. Flumeのセットアップ
ここまでで、SolrとBananaのセットアップが完了しました。ここからは、データ可視化プラットフォームのETL(Extract/Transform/Load)ツールの役割をするFlumeのインストールに移ります。
5.1 Flumeのインストール
FlumeはASFのプロジェクトサイト、またはアーカイブサイトからダウンロード可能です。ホームディレクトリ($HOME)下にflumeディレクトリを作成し、Flumeをダウンロードします。
$ mkdir $HOME/flume $ wget -P $HOME/flume http://archive.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
ダウンロードした圧縮ファイルを展開するとインストールは完了です。環境変数$FLUME_HOMEの設定も併せて行います。
$ tar xvzf $HOME/flume/apache-flume-1.5.2-bin.tar.gz -C $HOME/flume $ export FLUME_HOME=$HOME/flume/apache-flume-1.5.2-bin
5.2 差分ファイルのダウンロード
今回、FlumeからSolrへデータを送ると同時に、Morphlineを使用してデータの加工も行いたいと思います。しかし、Flume-1.5.2にはMorphlineを使ってデータを加工したり、Solrへデータを送信したりするのに必要なライブラリが同梱されていません。下記コマンドを実行し、今回必要なJARファイルをダウンロードします。
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/net/sf/saxon/Saxon-HE/9.5.1-5/Saxon-HE-9.5.1-5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/james/apache-mime4j-core/0.7.2/apache-mime4j-core-0.7.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/james/apache-mime4j-dom/0.7.2/apache-mime4j-dom-0.7.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/typesafe/config/1.2.1/config-1.2.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/httpcomponents/httpmime/4.2.1/httpmime-4.2.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/maxmind/db/maxmind-db/1.0.0/maxmind-db-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.1/jackson-core-2.5.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.1/jackson-databind-2.5.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.1/jackson-annotations-2.5.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-hadoop-compatibility/1.0.0/kite-hadoop-compatibility-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-avro/1.0.0/kite-morphlines-avro-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-core/1.0.0/kite-morphlines-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-core/1.0.0/kite-morphlines-hadoop-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-parquet-avro/1.0.0/kite-morphlines-hadoop-parquet-avro-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-rcfile/1.0.0/kite-morphlines-hadoop-rcfile-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-sequencefile/1.0.0/kite-morphlines-hadoop-sequencefile-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-json/1.0.0/kite-morphlines-json-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-maxmind/1.0.0/kite-morphlines-maxmind-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-metrics-servlets/1.0.0/kite-morphlines-metrics-servlets-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-saxon/1.0.0/kite-morphlines-saxon-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-solr-cell/1.0.0/kite-morphlines-solr-cell-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-solr-core/1.0.0/kite-morphlines-solr-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-tika-core/1.0.0/kite-morphlines-tika-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-tika-decompress/1.0.0/kite-morphlines-tika-decompress-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-twitter/1.0.0/kite-morphlines-twitter-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-useragent/1.0.0/kite-morphlines-useragent-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-core/3.0.2/metrics-core-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-healthchecks/3.0.2/metrics-healthchecks-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-json/3.0.2/metrics-json-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-jvm/3.0.2/metrics-jvm-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-servlets/3.0.2/metrics-servlets-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/noggit/noggit/0.7/noggit-0.7.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/solr/solr-core/4.10.4/solr-core-4.10.4.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/solr/solr-solrj/4.10.4/solr-solrj-4.10.4.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-core/1.5/tika-core-1.5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-parsers/1.5/tika-parsers-1.5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-xmp/1.5/tika-xmp-1.5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar
Morphlineの中でGrokという正規表現に似た方法を用いて、データを抽出する機能を使用します。そのGrokで必要な設定ファイルをダウンロードします。
$ mkdir $FLUME_HOME/conf/grok-dictionaries $ wget -P $FLUME_HOME/conf/grok-dictionaries https://raw.githubusercontent.com/cloudera/cdk/master/cdk-morphlines/cdk-morphlines-core/src/test/resources/grok-dictionaries/grok-patterns
httpdのログには接続元のIPアドレスが含まれます。そのIPアドレスからどの地域からアクセスしているかを可視化できるように準備します。MaxMind社(以下「MaxMind」)が無償で提供しているIPアドレスと地域情報をマッピングしたデータベースがありますので、同社のページからダウンロードしconfディレクトリへ展開しましょう。
$ wget -P $FLUME_HOME/conf http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz $ gunzip $FLUME_HOME/conf/GeoLite2-City.mmdb.gz
5.3 Flumeの設定
次に、Flumeの設定ファイルの準備をします。テンプレートファイルがありますので、flume-conf.propertiesとflume-env.shの2つをコピーします。
$ cp $FLUME_HOME/conf/flume-conf.properties.template $FLUME_HOME/conf/flume-conf.properties $ cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh
まず、flume-env.shを編集します。JAVA_OPTSでヒープサイズを大きくしましょう。Flumeではデフォルトが20MBとなっており、大きなクラスやファイルをロードするだけでOutOfMemoryExceptionが発生してしまいます。また、Flumeの設定ファイルが置かれているディレクトリをFLUME_CONF_DIRで指定しておくとよいでしょう。リスト4にflume-env.shの内容を記載します。
$ vi $FLUME_HOME/conf/flume-env.sh
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced # during Flume startup. # Enviroment variables can be set here. #JAVA_HOME=/usr/lib/jvm/java-6-sun # Give Flume more memory and pre-allocate, enable remote monitoring via JMX #JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote" JAVA_OPTS="-Xms256m -Xmx512m" # Note that the Flume conf directory is always included in the classpath. #FLUME_CLASSPATH="" FLUME_CONF_DIR=$FLUME_HOME/conf
続いてflume-conf.propertiesを編集しましょう。FlumeのAgentアーキテクチャは図10のようにSource、Channel、Sinkの3つのコンポーネントに細かく分かれています。Sourceは外部データソースからデータを読み込む、ETLのExtract(抽出)にあたる部分です。ChannelはSourceから流れてきたデータを一時的にためておくキューの役割を担います。Sinkが最終的なDWHへデータを出力するETLのLoad(ロード)にあたる役割をします。また、FlumeのSinkには、Morphlineという入力データの加工を行う、ETLのTransform(加工・変換)の役割をする機能を提供するものもあります。
リスト5にflume-conf.propertiesの内容を記載します。
コンポーネント | 役割 |
---|---|
Source | 外部データソースからのデータ抽出します。 |
Channel | 抽出したデータを一時的に貯めておくキューの役割をします。 |
Sink | DWHへデータをロードします。 |
Morphline | データを必要に応じて加工・変換を行います。 |
$ vi $FLUME_HOME/conf/flume-conf.properties
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = execSrc agent.channels = memoryChannel agent.sinks = solrSink # For each one of the sources, the type is defined agent.sources.execSrc.type = exec agent.sources.execSrc.command = tail -c 0 -f /tmp/access.log # The channel can be defined as follows. agent.sources.execSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink agent.sinks.solrSink.morphlineFile = /Users/mosuka/flume/apache-flume-1.5.2-bin/conf/morphline.conf #Specify the channel the sink should use agent.sinks.solrSink.channel = memoryChannel agent.sinks.solrSink.batchSize = 1000 agent.sinks.solrSink.batchDurationMillis = 1000 # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100000 agent.channels.memoryChannel.transactionCapacity = 100000
agent.sinks.solrSink.morphlineFileで指定しているパスは筆者の環境になります。ご自身で適宜パスを修正してください。
Sourceの設定は、/tmp/access.logに取得したいログファイルが出力されていると想定し、tailコマンドを実行してログファイルの差分を読み込むようになっています。可視化したいログファイルのパスに併せて変更するとよいでしょう。
Sinkの設定は、Solrへデータを送るため、MorphlineSolrSinkを利用します。Morphlineの細かなパイプライン処理の設定はmorphline.confファイル(リスト6)に記述します。
$ vi $FLUME_HOME/conf/morphline.conf
morphlines : [ { # Name used to identify a morphline. E.g. used if there are multiple # morphlines in a morphline config file id : httpd_logs # Import all morphline commands in these java packages and their # subpackages. Other commands that may be present on the classpath are # not visible to this morphline. importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { # Parse input attachment and emit a record for each input line readLine { charset : UTF-8 } } { grok { # Consume the output record of the previous command and pipe another # record downstream. # # A grok-dictionary is a config file that contains prefabricated # regular expressions that can be referred to by name. grok patterns # specify such a regex name, plus an optional output field name. # The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME} # The input line is expected in the "message" input field. dictionaryFiles : [/Users/mosuka/flume/apache-flume-1.5.2-bin/conf/grok-dictionaries] expressions : { message : """%{COMBINEDAPACHELOG}""" } } } # Set values { setValues { event_timestamp : "@{timestamp}" } } { convertTimestamp { field : event_timestamp inputFormats : ["dd/MMM/yyyy:HH:mm:ss Z"] inputTimezone : Asia/Tokyo outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : UTC } } { geoIP { inputField : clientip database : "/Users/mosuka/flume/apache-flume-1.5.2-bin/conf/GeoLite2-City.mmdb" } } { extractJsonPaths { flatten : true paths : { country_iso_code : /country/iso_code } } } # remove temporary work fields. { setValues { _attachment_body : [] } } # Consume the output record of the previous command, transform it # and pipe the record downstream. # # This command deletes record fields that are unknown to Solr # schema.xml. Recall that Solr throws an exception on any attempt to # load a document that contains a field that isn't specified in # schema.xml. #{ # sanitizeUnknownSolrFields { # # Location from which to fetch Solr schema # solrLocator : { # solrUrl : "http://localhost:8983/solr/collection1/" # Solr URL # # zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble # # collection : collection1 # Name of solr collection # } # } #} # log the record at INFO level to SLF4J { logInfo { format : "output record: {}", args : ["@{}"] } } # load the record into a Solr server or MapReduce Reducer { loadSolr { solrLocator : { solrUrl : "http://localhost:8983/solr/httpd_logs" # Solr URL # zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble # collection : httpd_logs # Name of solr collection } } } ] } ]
Morphlineの大まかな処理の内容を説明します。
- readLineコマンドで、データをUTF-8エンコードされている前提で1行ずつ読み込みます。
- grokコマンドで、読み込んだデータをApache httpdのログフォーマットでパースします。パースするための正規表現のルールはgrok-dictionariesディレクトリ以下に保存してあります。
- setValuesコマンドで、抽出されたデータのtimestampを、Bananaで必要なevent_timestampというフィールドに値をコピーします。
- convertTimestampコマンドで、event_timestampのタイムゾーン(JST)をUTC(協定世界時)に変換します。これはSolrの日付データはUTCで登録する必要があるためです。おそらく日本でビジネスを展開している場合、タイムスタンプのタイムゾーンにJSTを適用している場合が多いと思いますので、注意してください。event_timestampはBananaダッシュボードで期間を絞り込むために使用しています。
- geoIPコマンドで、抽出されたclientip(接続元のIPアドレス)を元に地域情報データベースGeoLite2-City.mmdbを検索し地域情報をJSON文字列で取得します。取得した地域情報データは_attachment_bodyに保持されます。
- extractJsonPathsコマンドで、XPathのような表記で_attachment_bodyのJSON文字列から、ISO国名コードを取得し、country_iso_codeへ保存します。
- setValuesコマンドで、不要になった_attachment_bodyを削除します。
- logInfoコマンドで、ログ出力を行います。
- loadSolrコマンドで、指定したSolrにデータを送信します。SolrCloudモードでSolrを起動している場合は、zkHostとcollectionを指定してください。
MorphlineやGrok、ISO国名コードについては下記リンクを参考にしてください。