Home | Benchmarks | Categories | Atom Feed

Posted on Fri 22 February 2019 under Flume

A Minimalist Guide to Flume

Apache Flume is used to collect, aggregate and distribute large amounts of log data. It can operate in a distributed manor and has various fail-over and recovery mechanisms. I've found it most useful for collecting log lines from Kafka topics and grouping them together into files on HDFS.

The project started in 2011 with some of the earliest commits coming from Jonathan Hsieh, Hari Shreedharan and Mike Percy, all of whom either currently, or at one point, worked for Cloudera. As of this writing the code base is made up of 95K lines of Java.

The building blocks of any Flume agent's configuration is one or more sources of data, one or more channels to transmit that data and one or more sinks to send the data to. Flume is event-driven, it's not something you'd trigger on a scheduled basis. It runs continuously and reacts to new data being presented to it. This contrasts tools like Airflow which run scheduled batch operations.

In this post I'll walk through feeding Nginx web traffic logs into Kafka, enriching them using Python and feeding Flume those enriched records for storage on HDFS.

Installing Prerequisites

The following was run on a fresh Ubuntu 16.04.2 LTS installation. The machine I'm using has an Intel Core i5 4670K clocked at 3.4 GHz, 8 GB of RAM and 1 TB of mechanical storage capacity.

First I've setup a standalone Hadoop environment following the instructions from my Hadoop 3 installation guide. Below I've installed Kafkacat for feeding and reading off of Kafka, libsnappy as I'll be using Snappy compression on the Kafka topics, Python, Screen for running applications in the background and Zookeeper which is used by Kafka for coordination.

$ sudo apt update
$ sudo apt install \
    kafkacat \
    libsnappy-dev \
    python-pip \
    python-virtualenv \
    screen \
    zookeeperd

I've created a virtual environment for the Python-based dependencies I'll be using. In it I've installed a web traffic log parser, MaxMind's IPv4 location lookup bindings, Pandas, Snappy bindings for Python and a browser agent parser.

$ virtualenv ~/.enrich
$ source ~/.enrich/bin/activate
$ pip install \
    apache-log-parser \
    geoip2 \
    kafka-python \
    pandas \
    python-snappy \
    user-agents

MaxMind's database is updated regularly. Below I've downloaded the latest version and stored it in my home folder.

$ wget -c http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
$ tar zxf GeoLite2-City.tar.gz
$ mv GeoLite2-City_*/GeoLite2-City.mmdb ~/

Flume & Kafka Up & Running

Below I've installed Flume and Kafka from their respective binary distributions.

$ DIST=http://www-eu.apache.org/dist
$ wget -c -O flume.tar.gz  $DIST/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
$ wget -c -O kafka.tgz     $DIST/kafka/1.1.1/kafka_2.11-1.1.1.tgz

I've stripped the documentation from Flume as it creates ~1,500 files. My view is that documentation should live anywhere but production.

$ sudo mkdir -p /opt/{flume,kafka}

$ sudo tar xzvf kafka.tgz \
    --directory=/opt/kafka \
    --strip 1

$ sudo tar xvf flume.tar.gz \
    --directory=/opt/flume \
    --exclude=apache-flume-1.9.0-bin/docs \
    --strip 1

I'll create and take ownership of the Kafka logs folder so that I can run the service without needing elevated permissions. Make sure to replace mark with the name of your UNIX account.

$ sudo mkdir -p /opt/kafka/logs
$ sudo chown -R mark /opt/kafka/logs

I'll launch the Zookeeper service and for the sake of simplicity, I'll run Kafka in a screen. I recommend Supervisor for keeping Kafka up and running in production.

$ sudo /etc/init.d/zookeeper start
$ screen
$ /opt/kafka/bin/kafka-server-start.sh \
    /opt/kafka/config/server.properties

Hit CTRL-a and then CTRL-d to detach from the screen session and return to the originating shell.

I'll create two Kafka topics. The first, nginx_log, will be fed the traffic logs as they were generated by Nginx. I'll then have a Python script that will parse, enrich and store the logs in CSV format in a separate topic called nginx_enriched. Since this is a standalone setup with a single disk I'll use a replication factor of 1.

$ for TOPIC in nginx_log nginx_enriched; do
    /opt/kafka/bin/kafka-topics.sh \
        --zookeeper 127.0.0.1:2181 \
        --create \
        --partitions 1 \
        --replication-factor 1 \
        --topic $TOPIC
  done

Below is the configuration for the Flume agent. It will read messages off the nginx_enriched Kafka topic and transport them using a memory channel to HDFS. The data will initially live in a temporary folder on HDFS until the record limit has been reached, at which point it'll store the resulting files under a /kafka topic name/year/month/day naming convention for the folder hierarchy. The records are stored in CSV format. Later on Hive will have a table pointed at this folder giving SQL access to the data as it comes in.

$ vi ~/kafka_to_hdfs.conf
feed1.sources  = kafka-source-1
feed1.channels = hdfs-channel-1
feed1.sinks    = hdfs-sink-1

feed1.sources.kafka-source-1.type             = org.apache.flume.source.kafka.KafkaSource
feed1.sources.kafka-source-1.channels         = hdfs-channel-1
feed1.sources.kafka-source-1.topic            = nginx_enriched
feed1.sources.kafka-source-1.batchSize        = 1000
feed1.sources.kafka-source-1.zookeeperConnect = 127.0.0.1:2181

feed1.channels.hdfs-channel-1.type                = memory
feed1.channels.hdfs-channel-1.capacity            = 1000
feed1.channels.hdfs-channel-1.transactionCapacity = 1000

feed1.sinks.hdfs-sink-1.channel                = hdfs-channel-1
feed1.sinks.hdfs-sink-1.hdfs.filePrefix        = hits
feed1.sinks.hdfs-sink-1.hdfs.fileType          = DataStream
feed1.sinks.hdfs-sink-1.hdfs.inUsePrefix       = tmp/
feed1.sinks.hdfs-sink-1.hdfs.path              = /%{topic}/year=%Y/month=%m/day=%d
feed1.sinks.hdfs-sink-1.hdfs.rollCount         = 100
feed1.sinks.hdfs-sink-1.hdfs.rollSize          = 0
feed1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
feed1.sinks.hdfs-sink-1.hdfs.writeFormat       = Text
feed1.sinks.hdfs-sink-1.type                   = hdfs

If you run into out of memory issues you can change the channel's type of "memory" to either "spillablememory" or "file". The Flume documentation covers how to tune these types of channels.

I'll launch the Flume agent in a screen. This is another candidate for running under Supervisor in production.

$ screen
$ /opt/flume/bin/flume-ng agent \
    -n feed1 \
    -c conf \
    -f ~/kafka_to_hdfs.conf \
    -Dflume.root.logger=INFO,console

Hit CTRL-a and then CTRL-d to detach from the screen session and return to the originating shell.

Feeding Data into Kafka

I've created a sample Nginx web traffic log file. Here are what the first three lines of content look like.

$ head -n3 access.log
1.2.3.4 - - [17/Feb/2019:08:41:54 +0000] "GET / HTTP/1.1" 200 7032 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-"
1.2.3.4 - - [17/Feb/2019:08:41:54 +0000] "GET /theme/images/mark.jpg HTTP/1.1" 200 9485 "https://tech.marksblogg.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-"
1.2.3.4 - - [17/Feb/2019:08:41:55 +0000] "GET /architecting-modern-data-platforms-book-review.html HTTP/1.1" 200 10822 "https://tech.marksblogg.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-"

I'll feed these logs into the nginx_log Kafka topic. Each line will exist as an individual message in that topic.

$ cat access.log \
    | kafkacat -P \
               -b localhost:9092 \
               -t nginx_log \
               -z snappy

I can then check that the logs are stored as expected in Kafka.

$ kafkacat -C \
           -b localhost:9092 \
           -t nginx_log \
           -o beginning \
    | less -S

Enriching Nginx Logs

I'm going to use a Python script to read each of the log lines from Kafka, parse, enrich and store them back onto a new Kafka topic. The enrichment steps include attempting to look up the city of each visitor's IP address and parsing the user agent string into a simple browser name and version.

I've used a group identifier for consuming Kafka topics so that I can run multiple instances of this script and they can share the workload. This is handy for scaling out enrichment tasks that are bound by the compute resources of a single process.

I'll flush the newly created messages to Kafka every 500 messages. Note that this scripts expects there is always more data to push things along. If you have a finite ending to your dataset there would need to be logic in place to push the un-flushed records into Kafka.

$ python
from   StringIO import StringIO

import apache_log_parser
import geoip2.database as geoip
from   kafka import (KafkaConsumer,
                     KafkaProducer)
import pandas as pd
from   urlparse import urlparse
from   user_agents import parse as ua_parse


geo_lookup  = geoip.Reader('GeoLite2-City.mmdb')
log_format  = r'%h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-Agent}i"'
line_parser = apache_log_parser.make_parser(log_format)

group_id = 'nginx_log_enrichers'
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
                         group_id=group_id,
                         auto_offset_reset='smallest')
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         retries=5,
                         acks='all')

consumer.subscribe(['nginx_log'])

for msg_count, msg in enumerate(consumer):
    out = {}

    try:
        req = line_parser(msg.value)
    except apache_log_parser.LineDoesntMatchException as exc:
        print exc
        continue

    url_ = urlparse(req['request_url'])

    out['url_scheme']   = url_.scheme
    out['url_netloc']   = url_.netloc
    out['url_path']     = url_.path
    out['url_params']   = url_.params
    out['url_query']    = url_.query
    out['url_fragment'] = url_.fragment

    for key in ('remote_host',
                'request_method',
                'request_http_ver',
                'status',
                'response_bytes_clf',):
        out[key] = None

        if req.get(key, None):
            if type(req.get(key, None)) is bool:
                out[key] = req.get(key)
            elif len(req.get(key).strip()):
                out[key] = req.get(key).strip()

    agent_ = ua_parse(req['request_header_user_agent'])

    for x in range(0, 3):
        try:
            out['browser_%d' % x] = \
                agent_.browser[x][0] if x == 1 else agent_.browser[x]
        except IndexError:
            out['browser_%d' % x] = None

    location_                   = geo_lookup.city(req['remote_host'])
    out['loc_city_name']        = location_.city.name
    out['loc_country_iso_code'] = location_.country.iso_code
    out['loc_continent_code']   = location_.continent.code

    output = StringIO()
    pd.DataFrame([out]).to_csv(output,
                               index=False,
                               header=False,
                               encoding='utf-8')

    producer.send('nginx_enriched', output.getvalue().strip())

    if msg_count and not msg_count % 500:
        producer.flush()

The enriched log lines look like the following prior to being serialised into CSV format.

{'browser_0': 'Chrome',
 'browser_1': 72,
 'browser_2': '72.0.3626',
 'loc_city_name': u'Tallinn',
 'loc_continent_code': u'EU',
 'loc_country_iso_code': u'EE',
 'remote_host': '1.2.3.4',
 'request_http_ver': '1.1',
 'request_method': 'GET',
 'response_bytes_clf': '7032',
 'status': '200',
 'url_fragment': '',
 'url_netloc': '',
 'url_params': '',
 'url_path': '/',
 'url_query': '',
 'url_scheme': ''}

While the above script is running I can see the following being reported by the Flume agent.

.. kafka.SourceRebalanceListener: topic nginx_enriched - partition 0 assigned.
.. hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
.. hdfs.BucketWriter: Creating /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp
.. hdfs.HDFSEventSink: Writer callback called.
.. hdfs.BucketWriter: Closing /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp
.. hdfs.BucketWriter: Renaming /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp to /nginx_enriched/year=2019/month=02/day=20/hits.1550663242571

Setting Up Hive Tables

With the data landing in HDFS I'll create a table in Hive that will point to the CSV-formatted data. I'll also create a separate table that will hold a copy of that data in compressed, columnar form using ORC formatted-files. Presto will be used to convert the CSV-formatted data into ORC later on. Columnar form can be two orders of magnitude quicker to query and an order of magnitude smaller than row-oriented data.

$ hive
CREATE EXTERNAL TABLE hits (
    browser_0            STRING,
    browser_1            INTEGER,
    browser_2            STRING,
    loc_city_name        STRING,
    loc_continent_code   VARCHAR(4),
    loc_country_iso_code VARCHAR(3),
    remote_host          VARCHAR(15),
    request_http_ver     FLOAT,
    request_method       VARCHAR(10),
    response_bytes_clf   BIGINT,
    security_researcher  STRING,
    status               SMALLINT,
    url_fragment         STRING,
    url_netloc           STRING,
    url_params           STRING,
    url_path             STRING,
    url_query            STRING,
    url_scheme           STRING
) PARTITIONED BY (year SMALLINT, month VARCHAR(2), day VARCHAR(2))
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION '/nginx_enriched/';
CREATE TABLE hits_orc (
    browser_0            STRING,
    browser_1            INTEGER,
    browser_2            STRING,
    loc_city_name        STRING,
    loc_continent_code   VARCHAR(4),
    loc_country_iso_code VARCHAR(3),
    remote_host          VARCHAR(15),
    request_http_ver     FLOAT,
    request_method       VARCHAR(10),
    response_bytes_clf   BIGINT,
    security_researcher  STRING,
    status               SMALLINT,
    url_fragment         STRING,
    url_netloc           STRING,
    url_params           STRING,
    url_path             STRING,
    url_query            STRING,
    url_scheme           STRING
) PARTITIONED BY (year SMALLINT, month VARCHAR(2), day VARCHAR(2))
  STORED AS orc;

The data is partitioned by year, month and day on HDFS; both month and day can have leading zeros so I'll use the VARCHAR type to store them. I'll run the following to add any new partitions to the Hive metastore.

MSCK REPAIR TABLE hits;
MSCK REPAIR TABLE hits_orc;

I can now check that Hive can see the existing partition.

SHOW PARTITIONS hits;
year=2019/month=02/day=20

Converting CSVs to ORC Format

Finally, I'll convert the CSV-formatted table contents into a separate, ORC-formatted table using Presto. I've found Presto to be the fastest query engine for converting CSV data into ORC format.

$ presto \
    --server localhost:8080 \
    --catalog hive \
    --schema default
INSERT INTO hits_orc
SELECT * FROM hits;

With the data loaded into ORC format I can run aggregate queries on the dataset.

SELECT loc_city_name,
       COUNT(*)
FROM hits_orc
GROUP BY 1;
 loc_city_name | _col1
---------------+-------
 Tallinn       |   119
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2019 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.