5. Flumeのセットアップ
5.1 Flumeのインストール
$ mkdir $HOME/flume $ wget -P $HOME/flume http://archive.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
$ tar xvzf $HOME/flume/apache-flume-1.5.2-bin.tar.gz -C $HOME/flume $ export FLUME_HOME=$HOME/flume/apache-flume-1.5.2-bin
5.2 差分ファイルのダウンロード
$ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/net/sf/saxon/Saxon-HE/9.5.1-5/Saxon-HE-9.5.1-5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/james/apache-mime4j-core/0.7.2/apache-mime4j-core-0.7.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/james/apache-mime4j-dom/0.7.2/apache-mime4j-dom-0.7.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/typesafe/config/1.2.1/config-1.2.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/httpcomponents/httpmime/4.2.1/httpmime-4.2.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/maxmind/db/maxmind-db/1.0.0/maxmind-db-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.1/jackson-core-2.5.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.1/jackson-databind-2.5.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.1/jackson-annotations-2.5.1.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-hadoop-compatibility/1.0.0/kite-hadoop-compatibility-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-avro/1.0.0/kite-morphlines-avro-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-core/1.0.0/kite-morphlines-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-core/1.0.0/kite-morphlines-hadoop-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-parquet-avro/1.0.0/kite-morphlines-hadoop-parquet-avro-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-rcfile/1.0.0/kite-morphlines-hadoop-rcfile-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-hadoop-sequencefile/1.0.0/kite-morphlines-hadoop-sequencefile-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-json/1.0.0/kite-morphlines-json-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-maxmind/1.0.0/kite-morphlines-maxmind-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-metrics-servlets/1.0.0/kite-morphlines-metrics-servlets-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-saxon/1.0.0/kite-morphlines-saxon-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-solr-cell/1.0.0/kite-morphlines-solr-cell-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-solr-core/1.0.0/kite-morphlines-solr-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-tika-core/1.0.0/kite-morphlines-tika-core-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-tika-decompress/1.0.0/kite-morphlines-tika-decompress-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-twitter/1.0.0/kite-morphlines-twitter-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/kitesdk/kite-morphlines-useragent/1.0.0/kite-morphlines-useragent-1.0.0.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-core/3.0.2/metrics-core-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-healthchecks/3.0.2/metrics-healthchecks-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-json/3.0.2/metrics-json-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-jvm/3.0.2/metrics-jvm-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/com/codahale/metrics/metrics-servlets/3.0.2/metrics-servlets-3.0.2.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/noggit/noggit/0.7/noggit-0.7.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/solr/solr-core/4.10.4/solr-core-4.10.4.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/solr/solr-solrj/4.10.4/solr-solrj-4.10.4.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-core/1.5/tika-core-1.5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-parsers/1.5/tika-parsers-1.5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/tika/tika-xmp/1.5/tika-xmp-1.5.jar $ wget -P $FLUME_HOME/lib http://central.maven.org/maven2/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar
$ mkdir $FLUME_HOME/conf/grok-dictionaries $ wget -P $FLUME_HOME/conf/grok-dictionaries https://raw.githubusercontent.com/cloudera/cdk/master/cdk-morphlines/cdk-morphlines-core/src/test/resources/grok-dictionaries/grok-patterns
$ wget -P $FLUME_HOME/conf http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz $ gunzip $FLUME_HOME/conf/GeoLite2-City.mmdb.gz
5.3 Flumeの設定
$ cp $FLUME_HOME/conf/flume-conf.properties.template $FLUME_HOME/conf/flume-conf.properties $ cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh
$ vi $FLUME_HOME/conf/flume-env.sh
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced # during Flume startup. # Enviroment variables can be set here. #JAVA_HOME=/usr/lib/jvm/java-6-sun # Give Flume more memory and pre-allocate, enable remote monitoring via JMX #JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote" JAVA_OPTS="-Xms256m -Xmx512m" # Note that the Flume conf directory is always included in the classpath. #FLUME_CLASSPATH="" FLUME_CONF_DIR=$FLUME_HOME/conf
コンポーネント | 役割 |
Source | 外部データソースからのデータ抽出します。 |
Channel | 抽出したデータを一時的に貯めておくキューの役割をします。 |
Sink | DWHへデータをロードします。 |
Morphline | データを必要に応じて加工・変換を行います。 |
$ vi $FLUME_HOME/conf/flume-conf.properties
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = execSrc agent.channels = memoryChannel agent.sinks = solrSink # For each one of the sources, the type is defined agent.sources.execSrc.type = exec agent.sources.execSrc.command = tail -c 0 -f /tmp/access.log # The channel can be defined as follows. agent.sources.execSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink agent.sinks.solrSink.morphlineFile = /Users/mosuka/flume/apache-flume-1.5.2-bin/conf/morphline.conf #Specify the channel the sink should use agent.sinks.solrSink.channel = memoryChannel agent.sinks.solrSink.batchSize = 1000 agent.sinks.solrSink.batchDurationMillis = 1000 # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100000 agent.channels.memoryChannel.transactionCapacity = 100000
$ vi $FLUME_HOME/conf/morphline.conf
morphlines : [ { # Name used to identify a morphline. E.g. used if there are multiple # morphlines in a morphline config file id : httpd_logs # Import all morphline commands in these java packages and their # subpackages. Other commands that may be present on the classpath are # not visible to this morphline. importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { # Parse input attachment and emit a record for each input line readLine { charset : UTF-8 } } { grok { # Consume the output record of the previous command and pipe another # record downstream. # # A grok-dictionary is a config file that contains prefabricated # regular expressions that can be referred to by name. grok patterns # specify such a regex name, plus an optional output field name. # The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME} # The input line is expected in the "message" input field. dictionaryFiles : [/Users/mosuka/flume/apache-flume-1.5.2-bin/conf/grok-dictionaries] expressions : { message : """%{COMBINEDAPACHELOG}""" } } } # Set values { setValues { event_timestamp : "@{timestamp}" } } { convertTimestamp { field : event_timestamp inputFormats : ["dd/MMM/yyyy:HH:mm:ss Z"] inputTimezone : Asia/Tokyo outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : UTC } } { geoIP { inputField : clientip database : "/Users/mosuka/flume/apache-flume-1.5.2-bin/conf/GeoLite2-City.mmdb" } } { extractJsonPaths { flatten : true paths : { country_iso_code : /country/iso_code } } } # remove temporary work fields. { setValues { _attachment_body : [] } } # Consume the output record of the previous command, transform it # and pipe the record downstream. # # This command deletes record fields that are unknown to Solr # schema.xml. Recall that Solr throws an exception on any attempt to # load a document that contains a field that isn't specified in # schema.xml. #{ # sanitizeUnknownSolrFields { # # Location from which to fetch Solr schema # solrLocator : { # solrUrl : "http://localhost:8983/solr/collection1/" # Solr URL # # zkHost : "" # ZooKeeper ensemble # # collection : collection1 # Name of solr collection # } # } #} # log the record at INFO level to SLF4J { logInfo { format : "output record: {}", args : ["@{}"] } } # load the record into a Solr server or MapReduce Reducer { loadSolr { solrLocator : { solrUrl : "http://localhost:8983/solr/httpd_logs" # Solr URL # zkHost : "" # ZooKeeper ensemble # collection : httpd_logs # Name of solr collection } } } ] } ]
- readLineコマンドで、データをUTF-8エンコードされている前提で1行ずつ読み込みます。
- grokコマンドで、読み込んだデータをApache httpdのログフォーマットでパースします。パースするための正規表現のルールはgrok-dictionariesディレクトリ以下に保存してあります。
- setValuesコマンドで、抽出されたデータのtimestampを、Bananaで必要なevent_timestampというフィールドに値をコピーします。
- convertTimestampコマンドで、event_timestampのタイムゾーン(JST)をUTC(協定世界時)に変換します。これはSolrの日付データはUTCで登録する必要があるためです。おそらく日本でビジネスを展開している場合、タイムスタンプのタイムゾーンにJSTを適用している場合が多いと思いますので、注意してください。event_timestampはBananaダッシュボードで期間を絞り込むために使用しています。
- geoIPコマンドで、抽出されたclientip(接続元のIPアドレス)を元に地域情報データベースGeoLite2-City.mmdbを検索し地域情報をJSON文字列で取得します。取得した地域情報データは_attachment_bodyに保持されます。
- extractJsonPathsコマンドで、XPathのような表記で_attachment_bodyのJSON文字列から、ISO国名コードを取得し、country_iso_codeへ保存します。
- setValuesコマンドで、不要になった_attachment_bodyを削除します。
- logInfoコマンドで、ログ出力を行います。
- loadSolrコマンドで、指定したSolrにデータを送信します。SolrCloudモードでSolrを起動している場合は、zkHostとcollectionを指定してください。