SHOEISHA iD

※旧SEメンバーシップ会員の方は、同じ登録情報(メールアドレス&パスワード)でログインいただけます

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

Yahoo! JAPAN 黒帯シリーズ

OSSのツール「Solr」「Flume」「Banana」の組み合わせによるデータ可視化プラットフォーム構築

Yahoo! JAPAN 黒帯シリーズ 第2回 ~ 「Solr黒帯」によるデータ可視化環境構築の解説

  • X ポスト
  • このエントリーをはてなブックマークに追加

5. Flumeのセットアップ

 ここまでで、SolrとBananaのセットアップが完了しました。ここからは、データ可視化プラットフォームのETL(Extract/Transform/Load)ツールの役割をするFlumeのインストールに移ります。

5.1 Flumeのインストール

 FlumeはASFのプロジェクトサイト、またはアーカイブサイトからダウンロード可能です。ホームディレクトリ($HOME)下にflumeディレクトリを作成し、Flumeをダウンロードします。

$ mkdir $HOME/flume
$ wget -P $HOME/flume http://archive.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz

 ダウンロードした圧縮ファイルを展開するとインストールは完了です。環境変数$FLUME_HOMEの設定も併せて行います。

$ tar xvzf $HOME/flume/apache-flume-1.5.2-bin.tar.gz -C $HOME/flume
$ export FLUME_HOME=$HOME/flume/apache-flume-1.5.2-bin

5.2 差分ファイルのダウンロード

 今回、FlumeからSolrへデータを送ると同時に、Morphlineを使用してデータの加工も行いたいと思います。しかし、Flume-1.5.2にはMorphlineを使ってデータを加工したり、Solrへデータを送信したりするのに必要なライブラリが同梱されていません。下記コマンドを実行し、今回必要なJARファイルをダウンロードします。

$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/net/sf/saxon/Saxon-HE/9.5.1-5/Saxon-HE-9.5.1-5.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/james/apache-mime4j-core/0.7.2/apache-mime4j-core-0.7.2.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/james/apache-mime4j-dom/0.7.2/apache-mime4j-dom-0.7.2.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/typesafe/config/1.2.1/config-1.2.1.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/httpcomponents/httpmime/4.2.1/httpmime-4.2.1.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/maxmind/db/maxmind-db/1.0.0/maxmind-db-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.1/jackson-core-2.5.1.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.1/jackson-databind-2.5.1.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.1/jackson-annotations-2.5.1.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-hadoop-compatibility/1.0.0/kite-hadoop-compatibility-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-avro/1.0.0/kite-morphlines-avro-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-core/1.0.0/kite-morphlines-core-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-core/1.0.0/kite-morphlines-hadoop-core-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-parquet-avro/1.0.0/kite-morphlines-hadoop-parquet-avro-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-rcfile/1.0.0/kite-morphlines-hadoop-rcfile-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-sequencefile/1.0.0/kite-morphlines-hadoop-sequencefile-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-json/1.0.0/kite-morphlines-json-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-maxmind/1.0.0/kite-morphlines-maxmind-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-metrics-servlets/1.0.0/kite-morphlines-metrics-servlets-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-saxon/1.0.0/kite-morphlines-saxon-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-solr-cell/1.0.0/kite-morphlines-solr-cell-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-solr-core/1.0.0/kite-morphlines-solr-core-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-tika-core/1.0.0/kite-morphlines-tika-core-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-tika-decompress/1.0.0/kite-morphlines-tika-decompress-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-twitter/1.0.0/kite-morphlines-twitter-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-useragent/1.0.0/kite-morphlines-useragent-1.0.0.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-core/3.0.2/metrics-core-3.0.2.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-healthchecks/3.0.2/metrics-healthchecks-3.0.2.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-json/3.0.2/metrics-json-3.0.2.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-jvm/3.0.2/metrics-jvm-3.0.2.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-servlets/3.0.2/metrics-servlets-3.0.2.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/noggit/noggit/0.7/noggit-0.7.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/solr/solr-core/4.10.4/solr-core-4.10.4.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/solr/solr-solrj/4.10.4/solr-solrj-4.10.4.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-core/1.5/tika-core-1.5.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-parsers/1.5/tika-parsers-1.5.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-xmp/1.5/tika-xmp-1.5.jar
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar

 Morphlineの中でGrokという正規表現に似た方法を用いて、データを抽出する機能を使用します。そのGrokで必要な設定ファイルをダウンロードします。

$ mkdir $FLUME_HOME/conf/grok-dictionaries
$ wget -P $FLUME_HOME/conf/grok-dictionaries https://raw.githubusercontent.com/cloudera/cdk/master/cdk-morphlines/cdk-morphlines-core/src/test/resources/grok-dictionaries/grok-patterns

 httpdのログには接続元のIPアドレスが含まれます。そのIPアドレスからどの地域からアクセスしているかを可視化できるように準備します。MaxMind社(以下「MaxMind」)が無償で提供しているIPアドレスと地域情報をマッピングしたデータベースがありますので、同社のページからダウンロードしconfディレクトリへ展開しましょう。

$ wget -P $FLUME_HOME/conf http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz
$ gunzip $FLUME_HOME/conf/GeoLite2-City.mmdb.gz

5.3 Flumeの設定

 次に、Flumeの設定ファイルの準備をします。テンプレートファイルがありますので、flume-conf.propertiesとflume-env.shの2つをコピーします。

$ cp $FLUME_HOME/conf/flume-conf.properties.template $FLUME_HOME/conf/flume-conf.properties
$ cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh

 まず、flume-env.shを編集します。JAVA_OPTSでヒープサイズを大きくしましょう。Flumeではデフォルトが20MBとなっており、大きなクラスやファイルをロードするだけでOutOfMemoryExceptionが発生してしまいます。また、Flumeの設定ファイルが置かれているディレクトリをFLUME_CONF_DIRで指定しておくとよいでしょう。リスト4にflume-env.shの内容を記載します。

$ vi $FLUME_HOME/conf/flume-env.sh
リスト4 flume-env.sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.

# Enviroment variables can be set here.

#JAVA_HOME=/usr/lib/jvm/java-6-sun

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
#JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
JAVA_OPTS="-Xms256m -Xmx512m"

# Note that the Flume conf directory is always included in the classpath.
#FLUME_CLASSPATH=""

FLUME_CONF_DIR=$FLUME_HOME/conf

 続いてflume-conf.propertiesを編集しましょう。FlumeのAgentアーキテクチャは図10のようにSource、Channel、Sinkの3つのコンポーネントに細かく分かれています。Sourceは外部データソースからデータを読み込む、ETLのExtract(抽出)にあたる部分です。ChannelはSourceから流れてきたデータを一時的にためておくキューの役割を担います。Sinkが最終的なDWHへデータを出力するETLのLoad(ロード)にあたる役割をします。また、FlumeのSinkには、Morphlineという入力データの加工を行う、ETLのTransform(加工・変換)の役割をする機能を提供するものもあります。

 リスト5にflume-conf.propertiesの内容を記載します。

図10 Flumeアーキテクチャ(https://flume.apache.org/FlumeUserGuide.htmlより引用)
図10 Flumeアーキテクチャ(https://flume.apache.org/FlumeUserGuide.htmlより引用)
表4 Flume Agent コンポーネント
コンポーネント 役割
Source 外部データソースからのデータ抽出します。
Channel 抽出したデータを一時的に貯めておくキューの役割をします。
Sink DWHへデータをロードします。
Morphline データを必要に応じて加工・変換を行います。
$ vi $FLUME_HOME/conf/flume-conf.properties
リスト5 flume-conf.properties
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

agent.sources = execSrc
agent.channels = memoryChannel
agent.sinks = solrSink

# For each one of the sources, the type is defined
agent.sources.execSrc.type = exec
agent.sources.execSrc.command = tail -c 0 -f /tmp/access.log

# The channel can be defined as follows.
agent.sources.execSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent.sinks.solrSink.morphlineFile = /Users/mosuka/flume/apache-flume-1.5.2-bin/conf/morphline.conf

#Specify the channel the sink should use
agent.sinks.solrSink.channel = memoryChannel
agent.sinks.solrSink.batchSize = 1000
agent.sinks.solrSink.batchDurationMillis = 1000

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100000
agent.channels.memoryChannel.transactionCapacity = 100000
補足

 agent.sinks.solrSink.morphlineFileで指定しているパスは筆者の環境になります。ご自身で適宜パスを修正してください。

 Sourceの設定は、/tmp/access.logに取得したいログファイルが出力されていると想定し、tailコマンドを実行してログファイルの差分を読み込むようになっています。可視化したいログファイルのパスに併せて変更するとよいでしょう。

 Sinkの設定は、Solrへデータを送るため、MorphlineSolrSinkを利用します。Morphlineの細かなパイプライン処理の設定はmorphline.confファイル(リスト6)に記述します。

$ vi $FLUME_HOME/conf/morphline.conf
リスト6 morphline.conf
morphlines : [
  {
    # Name used to identify a morphline. E.g. used if there are multiple
    # morphlines in a morphline config file
    id : httpd_logs

    # Import all morphline commands in these java packages and their
    # subpackages. Other commands that may be present on the classpath are
    # not visible to this morphline.
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]

    commands : [
      {
        # Parse input attachment and emit a record for each input line
        readLine {
          charset : UTF-8
        }
      }

      {
        grok {
          # Consume the output record of the previous command and pipe another
          # record downstream.
          #
          # A grok-dictionary is a config file that contains prefabricated
          # regular expressions that can be referred to by name. grok patterns
          # specify such a regex name, plus an optional output field name.
          # The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME}
          # The input line is expected in the "message" input field.
          dictionaryFiles : [/Users/mosuka/flume/apache-flume-1.5.2-bin/conf/grok-dictionaries]
          expressions : {
            message : """%{COMBINEDAPACHELOG}"""
          }
        }
      }

      # Set values
      {
        setValues {
          event_timestamp : "@{timestamp}"
        }
      }

      {
        convertTimestamp {
          field : event_timestamp
          inputFormats : ["dd/MMM/yyyy:HH:mm:ss Z"]
          inputTimezone : Asia/Tokyo
          outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
          outputTimezone : UTC
        }
      }

      {
        geoIP {
          inputField : clientip
          database : "/Users/mosuka/flume/apache-flume-1.5.2-bin/conf/GeoLite2-City.mmdb"
        }
      }

      {
        extractJsonPaths {
          flatten : true
          paths : {
            country_iso_code : /country/iso_code
          }
        }
      }

      # remove temporary work fields.
      {
        setValues {
          _attachment_body : []
        }
      }

      # Consume the output record of the previous command, transform it
      # and pipe the record downstream.
      #
      # This command deletes record fields that are unknown to Solr
      # schema.xml. Recall that Solr throws an exception on any attempt to
      # load a document that contains a field that isn't specified in
      # schema.xml.
      #{
      #  sanitizeUnknownSolrFields {
      #    # Location from which to fetch Solr schema
      #    solrLocator : {
      #      solrUrl : "http://localhost:8983/solr/collection1/"  # Solr URL
      #      # zkHost : "127.0.0.1:2181/solr"                     # ZooKeeper ensemble
      #      # collection : collection1                           # Name of solr collection
      #    }
      #  }
      #}

      # log the record at INFO level to SLF4J
      { logInfo { format : "output record: {}", args : ["@{}"] } }

      # load the record into a Solr server or MapReduce Reducer
      {
        loadSolr {
          solrLocator : {
            solrUrl : "http://localhost:8983/solr/httpd_logs"  # Solr URL
            # zkHost : "127.0.0.1:2181/solr"                       # ZooKeeper ensemble
            # collection : httpd_logs                           # Name of solr collection
          }
        }
      }
    ]
  }
]

 Morphlineの大まかな処理の内容を説明します。

  1. readLineコマンドで、データをUTF-8エンコードされている前提で1行ずつ読み込みます。
  2. grokコマンドで、読み込んだデータをApache httpdのログフォーマットでパースします。パースするための正規表現のルールはgrok-dictionariesディレクトリ以下に保存してあります。
  3. setValuesコマンドで、抽出されたデータのtimestampを、Bananaで必要なevent_timestampというフィールドに値をコピーします。
  4. convertTimestampコマンドで、event_timestampのタイムゾーン(JST)をUTC(協定世界時)に変換します。これはSolrの日付データはUTCで登録する必要があるためです。おそらく日本でビジネスを展開している場合、タイムスタンプのタイムゾーンにJSTを適用している場合が多いと思いますので、注意してください。event_timestampはBananaダッシュボードで期間を絞り込むために使用しています。
  5. geoIPコマンドで、抽出されたclientip(接続元のIPアドレス)を元に地域情報データベースGeoLite2-City.mmdbを検索し地域情報をJSON文字列で取得します。取得した地域情報データは_attachment_bodyに保持されます。
  6. extractJsonPathsコマンドで、XPathのような表記で_attachment_bodyのJSON文字列から、ISO国名コードを取得し、country_iso_codeへ保存します。
  7. setValuesコマンドで、不要になった_attachment_bodyを削除します。
  8. logInfoコマンドで、ログ出力を行います。
  9. loadSolrコマンドで、指定したSolrにデータを送信します。SolrCloudモードでSolrを起動している場合は、zkHostとcollectionを指定してください。

 MorphlineやGrok、ISO国名コードについては下記リンクを参考にしてください。

次のページ
6. Flumeの起動

この記事は参考になりましたか?

  • X ポスト
  • このエントリーをはてなブックマークに追加
Yahoo! JAPAN 黒帯シリーズ連載記事一覧

もっと読む

この記事の著者

大須賀 稔(ヤフー株式会社)(オオスカ ミノル)

1975年埼玉県生まれ。インフォシーク入社後、同社の吸収合併により楽天、米国Ask.com日本法人、楽天(復帰)、ロンウイットを経て、2014年ヤフー株式会社入社。2013年、Apacheソフトウェア財団の運営するSolrの関連プロジェクトであるManifoldCFのコミッターに就任。「[改訂新版]...

※プロフィールは、執筆時点、または直近の記事の寄稿時点での内容です

この記事は参考になりましたか?

この記事をシェア

  • X ポスト
  • このエントリーをはてなブックマークに追加
CodeZine(コードジン)
https://codezine.jp/article/detail/8707 2015/05/28 14:00

おすすめ

アクセスランキング

アクセスランキング

イベント

CodeZine編集部では、現場で活躍するデベロッパーをスターにするためのカンファレンス「Developers Summit」や、エンジニアの生きざまをブーストするためのイベント「Developers Boost」など、さまざまなカンファレンスを企画・運営しています。

新規会員登録無料のご案内

  • ・全ての過去記事が閲覧できます
  • ・会員限定メルマガを受信できます

メールバックナンバー

アクセスランキング

アクセスランキング