Home | Benchmarks | Archives | Atom Feed

Posted on Sun 31 January 2016

Presto, Parquet & Airpal

I came across a blog post from Brandon Harris recently where he discussed a credit card fraud detection project he'd been working on with a team at the University of Chicago. In the post he described how Presto and Parquet-formatted files had gone a long way to speeding up ad-hoc queries against a ~250GB dataset he's working with.

Presto was born at Facebook and was open sourced within a year of it's inception. It's a distributed query engine capable of running interactive queries against big data sources. There's support for data sources such as Hive, Kafka, PostgreSQL, Redis and Cassandra among many others. Netflix has blogged about their positive experiences with Presto on a 10PB Data Warehouse they've got that's happily handling 2,500 ad-hoc queries a day.

In Brandon's blog post there is a chart showing a query that's executed in Hive against data stored in CSV format taking 130 seconds and then the same query run via Presto against data stored in Parquet format taking less than 5 seconds. I trust the measurements of his queries are accurate but what I'm interested in is what is involved in getting an environment up to run these sorts of queries.

As of this writing Bigtop's Presto support isn't ready (though pull requests are being worked on) so to get an environment up and running locally I'll have to perform some of the installation steps manually.

Launching a Hadoop Cluster in Docker Containers

This process begins with a fresh Ubuntu 15 installation acting as the host for Docker containers that a Hadoop cluster will live within. I discuss getting Ubuntu 15 ready to run Docker in my Hadoop Up and Running blog post.

With Docker ready I'll checkout the Bigtop git repository and launch Ubuntu 14.04-based containers (as of this writing this is the latest supported version of Ubuntu on Intel-based systems).

$ git clone https://github.com/apache/bigtop.git
$ cd bigtop/bigtop-deploy/vm/vagrant-puppet-docker/
$ sudo docker pull bigtop/deploy:ubuntu-14.04
$ vi vagrantconfig.yaml
docker:
        memory_size: "4096"
        image:  "bigtop/deploy:ubuntu-14.04"

boot2docker:
        memory_size: "4096"
        number_cpus: "1"

repo: "http://bigtop-repos.s3.amazonaws.com/releases/1.0.0/ubuntu/trusty/x86_64"
distro: debian
components: [hadoop, yarn, hive]
namenode_ui_port: "50070"
yarn_ui_port: "8088"
hbase_ui_port: "60010"
enable_local_repo: false
smoke_test_components: [mapreduce, pig]
jdk: "openjdk-7-jdk"

While I was writing this blog post Bigtop 1.1 was being cut and the resources from their 1.1.0 endpoint were returning HTTP 403 messages so I've stuck with the 1.0.0 endpoints for now.

$ sudo ./docker-hadoop.sh --create 3
$ sudo vagrant ssh bigtop1

Getting Hive's Metastore Up and Running

By default the Derby embedded database driver is enabled in the boilerplate Hive configurations provided by Bigtop. This driver can only allow one process at a time to connect to it. If you use it Hive's Metastore server won't communicate properly with Presto and you'll get "does not exist" messages every time you try to access a table.

SELECT * FROM track_metadata_csv;
... Table hive.default.track_metadata_csv does not exist

For this reason I've installed MySQL and used it as the data backend for Hive's Metastore.

$ apt-get install \
    libmysql-java \
    mysql-server
$ ln -s /usr/share/java/mysql-connector-java.jar \
        /usr/lib/hive/lib/mysql-connector-java.jar
$ /etc/init.d/mysql start
$ mysql -uroot -proot -e'CREATE DATABASE hcatalog;'

When you install MySQL you'll be prompted to set a root login and password. I've set both values to root in this example. Below is the Hive site configuration file.

$ vi /etc/hive/conf/hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>bigtop1.docker</value>
    </property>

    <property>
        <name>hive.execution.engine</name>
        <value>mr</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost/hcatalog?createDatabaseIfNotExist=true</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>root</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>

    <property>
        <name>hive.hwi.war.file</name>
        <value>/usr/lib/hive/lib/hive-hwi.war</value>
    </property>
</configuration>

I then launched Hive and made sure the Metastore's database exists.

$ hive
CREATE DATABASE IF NOT EXISTS metastore_db;

After that I closed out of Hive and launched it's Metastore service in the background. You'll see it binded to port 9083 if it's running.

$ hive --service metastore &
$ netstat -an | grep 9083

Getting some data to play with

I need some data to play with in this exercise so I dumped the Million Song Dataset to CSV and imported it into HDFS.

$ apt-get install sqlite3
$ wget -c http://labrosa.ee.columbia.edu/millionsong/sites/default/files/AdditionalFiles/track_metadata.db
$ sqlite3 track_metadata.db <<!
.headers off
.mode csv
.output track_metadata.csv
SELECT track_id,
     artist_id,
     artist_familiarity,
     artist_hotttnesss,
     duration,
     year
FROM songs;
!
$ hadoop fs -put \
    track_metadata.csv \
    /tmp/track_metadata.csv

With the CSV file sitting in HDFS I'll create a Hive table for it. Once that table is created I can create a second, Parquet-formatted table and import the data from the first table into the second.

$ hive
CREATE TABLE track_metadata_csv (
    track_id            VARCHAR(18),
    artist_id           VARCHAR(18),
    artist_familiarity  DECIMAL(16, 15),
    artist_hotttnesss   DECIMAL(16, 15),
    duration            DECIMAL(12, 8),
    year                SMALLINT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

LOAD DATA INPATH '/tmp/track_metadata.csv'
INTO TABLE track_metadata_csv;
CREATE TABLE track_metadata_parquet (
    track_id            VARCHAR(18),
    artist_id           VARCHAR(18),
    artist_familiarity  DECIMAL(16, 15),
    artist_hotttnesss   DECIMAL(16, 15),
    duration            DECIMAL(12, 8),
    year                SMALLINT
)
STORED AS parquet;

INSERT INTO TABLE track_metadata_parquet
SELECT * FROM track_metadata_csv;

I now have that table's metadata stored in Hive's Metastore and a Parquet-formatted file of that data sitting on HDFS:

$ hadoop fs -ls \
    /user/hive/warehouse/track_metadata_parquet
Found 1 items
-rw-r--r--   3 root hadoop   45849172 2016-01-31 13:31 /user/hive/warehouse/track_metadata_parquet/000000_0
$ mysqldump \
    -uroot \
    -proot \
    --no-create-info \
    --skip-add-locks \
    --skip-disable-keys \
    --skip-comments \
    hcatalog | \
    grep ^INSERT | \
    sort
INSERT INTO `CDS` VALUES (1),(2);
INSERT INTO `COLUMNS_V2` VALUES
    (1,NULL,'artist_familiarity','decimal(16,15)',2),
    (1,NULL,'artist_hotttnesss','decimal(16,15)',3),
    (1,NULL,'artist_id','varchar(18)',1),
    (1,NULL,'duration','decimal(12,8)',4),
    (1,NULL,'track_id','varchar(18)',0),
    (1,NULL,'year','smallint',5),
    (2,NULL,'artist_familiarity','decimal(16,15)',2),
    (2,NULL,'artist_hotttnesss','decimal(16,15)',3),
    (2,NULL,'artist_id','varchar(18)',1),
    (2,NULL,'duration','decimal(12,8)',4),
    (2,NULL,'track_id','varchar(18)',0),
    (2,NULL,'year','smallint',5);
INSERT INTO `DBS` VALUES
    (1,'Default Hive database','hdfs://bigtop1.docker:8020/user/hive/warehouse','default','public','ROLE'),
    (2,NULL,'hdfs://bigtop1.docker:8020/user/hive/warehouse/metastore_db.db','metastore_db','root','USER');
INSERT INTO `GLOBAL_PRIVS` VALUES (1,1454255378,1,'admin','ROLE','admin','ROLE','All');
INSERT INTO `ROLES` VALUES
    (1,1454255378,'admin','admin'),
    (2,1454255378,'public','public');
INSERT INTO `SDS` VALUES
    (1,1,'org.apache.hadoop.mapred.TextInputFormat','\0','\0','hdfs://bigtop1.docker:8020/user/hive/warehouse/track_metadata_csv',-1,'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',1),
    (2,2,'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat','\0','\0','hdfs://bigtop1.docker:8020/user/hive/warehouse/track_metadata_parquet',-1,'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',2);
INSERT INTO `SEQUENCE_TABLE` VALUES
    ('org.apache.hadoop.hive.metastore.model.MColumnDescriptor',6),
    ('org.apache.hadoop.hive.metastore.model.MDatabase',6),
    ('org.apache.hadoop.hive.metastore.model.MGlobalPrivilege',6),
    ('org.apache.hadoop.hive.metastore.model.MRole',6),
    ('org.apache.hadoop.hive.metastore.model.MSerDeInfo',6),
    ('org.apache.hadoop.hive.metastore.model.MStorageDescriptor',6),
    ('org.apache.hadoop.hive.metastore.model.MTable',6),
    ('org.apache.hadoop.hive.metastore.model.MVersionTable',6);
INSERT INTO `SERDES` VALUES
    (1,NULL,'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'),
    (2,NULL,'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe');
INSERT INTO `SERDE_PARAMS` VALUES
    (1,'field.delim',','),
    (1,'serialization.format',','),
    (2,'serialization.format','1');
INSERT INTO `TABLE_PARAMS` VALUES
    (1,'COLUMN_STATS_ACCURATE','true'),
    (1,'numFiles','2'),
    (1,'numRows','0'),
    (1,'rawDataSize','0'),
    (1,'totalSize','161179824'),
    (1,'transient_lastDdlTime','1454255466'),
    (2,'COLUMN_STATS_ACCURATE','true'),
    (2,'numFiles','2'),
    (2,'numRows','2000000'),
    (2,'rawDataSize','12000000'),
    (2,'totalSize','136579903'),
    (2,'transient_lastDdlTime','1454255498');
INSERT INTO `TBLS` VALUES
    (1,1454255462,1,0,'root',0,1,'track_metadata_csv','MANAGED_TABLE',NULL,NULL),
    (2,1454255471,1,0,'root',0,2,'track_metadata_parquet','MANAGED_TABLE',NULL,NULL);
INSERT INTO `VERSION` VALUES (1,'0.14.0','Set by MetaStore');

Presto Up and Running

Presto requires Java 8 so I'll install that first.

$ add-apt-repository ppa:webupd8team/java
$ apt-get update
$ apt-get install oracle-java8-installer

I have yet to see recent Debian packages for Presto so I'll download the binaries instead.

$ cd ~
$ wget -c https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.133/presto-server-0.133.tar.gz
$ tar xzf presto-server-0.133.tar.gz

Presto requires a data folder for it to store locks, logs and a few other items and also requires a number of configuration files before it can begin to work properly.

$ mkdir -p /root/datap
$ mkdir -p ~/presto-server-0.133/etc/catalog
$ cd ~/presto-server-0.133/etc

Normally I wouldn't suggest creating a data folder in the root partition but this work is all taking place in a Docker container that has exposed 13GB of space on the root partition.

$ df -H
Filesystem      Size  Used Avail Use% Mounted on
none             13G  6.4G  5.6G  54% /
tmpfs           4.2G  8.2k  4.2G   1% /dev
tmpfs           4.2G     0  4.2G   0% /sys/fs/cgroup
/dev/sda1        13G  6.4G  5.6G  54% /vagrant
shm              68M     0   68M   0% /dev/shm

Next I need to create six configuration files. Below is an outline of where they live within the ~/presto-server-0.133/etc folder:

$ tree ~/presto-server-0.133/etc
etc
|-- catalog
|   |-- hive.properties
|   `-- jmx.properties
|-- config.properties
|-- jvm.config
|-- log.properties
`-- node.properties

Here are the commands to set the contents on each of the configuration files.

$ vi ~/presto-server-0.133/etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=800MB
query.max-memory-per-node=200MB
discovery-server.enabled=true
discovery.uri=http://127.0.0.1:8080
$ vi ~/presto-server-0.133/etc/jvm.config
-server
-Xmx800M
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:OnOutOfMemoryError=kill -9 %p
$ vi ~/presto-server-0.133/etc/log.properties
com.facebook.presto=INFO
$ vi ~/presto-server-0.133/etc/node.properties
node.environment=dev
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/root/datap

The JMX connector provides the ability to query Java Management Extensions (JMX) information from all nodes in a Presto cluster. Presto itself is heavily instrumented via JMX.

$ vi ~/presto-server-0.133/etc/catalog/jmx.properties
connector.name=jmx

This file allows Presto to know where our Hive Metastore is and which connector to use to communicate with it.

$ vi ~/presto-server-0.133/etc/catalog/hive.properties
hive.metastore.uri=thrift://bigtop1.docker:9083
connector.name=hive-hadoop2

With those in place you can launch Presto's server.

$ ~/presto-server-0.133/bin/launcher start

It should expose a web frontend on port 8080 if it's running properly.

$ curl localhost:8080

Presto's CLI Up and Running

I'll download the CLI JAR file for Presto, rename it to presto and then I can use it to connect to the Presto Server.

$ cd ~
$ wget -c https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.133/presto-cli-0.133-executable.jar
$ mv presto-cli-0.133-executable.jar presto
$ chmod +x presto
$ ./presto --server localhost:8080 --catalog hive --schema default

I'll then see if Presto can see the schemas in the Hive Metastore.

show schemas from hive;
       Schema
--------------------
 default
 information_schema
 metastore_db
(3 rows)

Executing Queries in Presto

With the CLI communicating with the server properly I'll run two queries. The first will count how many records per year exist in our million song database using the data in the CSV-backed table and the second will do the same against the Parquet-backed table.

SELECT year, count(*)
FROM track_metadata_csv
GROUP BY year
ORDER BY year;
 year | _col1
------+--------
    0 | 968848
 1922 |     12
 1924 |     10
 ...
 2008 |  69540
 2009 |  62102
 2010 |  18794
 2011 |      2
(90 rows)

Query 20160131_160009_00002_s4z65, FINISHED, 1 node
Splits: 6 total, 6 done (100.00%)
0:09 [2M rows, 154MB] [221K rows/s, 17MB/s]
SELECT year, count(*)
FROM track_metadata_parquet
GROUP BY year
ORDER BY year;
 year | _col1
------+--------
    0 | 968848
 1922 |     12
 1924 |     10
 ...
 2008 |  69540
 2009 |  62102
 2010 |  18794
 2011 |      2
(90 rows)

Query 20160131_160411_00005_s4z65, FINISHED, 1 node
Splits: 5 total, 5 done (100.00%)
0:04 [3M rows, 87MB] [799K rows/s, 23.2MB/s]

The query finished in 9 seconds with the CSV-formatted data and in 4 seconds with the Parquet-formatted data. I grepped the server log to see the time lines recorded by the Query Monitor.

$ grep 20160131_160009_00002_s4z65 ~/datap/var/log/server.log
...elapsed 3886.00ms :: planning 644.26ms :: scheduling 1099.00ms :: running 2094.00ms :: finishing 49.00ms...

$ grep 20160131_160411_00005_s4z65 ~/datap/var/log/server.log
...elapsed 1406.00ms :: planning 91.68ms :: scheduling 55.00ms :: running 1221.00ms :: finishing 38.00ms...

I'm not surprised that the columnar-based, Parquet-format-backed query was twice as fast but I'm not sure how the timings the Query Monitor recorded relate to the times reported by the CLI. It gives me something to dig into at a later point.

I took a look at the query plans to see if they offer anything of interest. In the case of these two queries they were identical bar the table names.

EXPLAIN
SELECT year, count(*)
FROM track_metadata_csv
GROUP BY year
ORDER BY year;
                                                                  Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------
 - Output[year, _col1] => [year:bigint, count:bigint]
         _col1 := count
     - Sort[year ASC_NULLS_LAST] => [year:bigint, count:bigint]
         - Exchange[GATHER] => year:bigint, count:bigint
             - Aggregate(FINAL)[year] => [year:bigint, count:bigint]
                     count := "count"("count_9")
                 - Exchange[REPARTITION] => year:bigint, count_9:bigint
                     - Aggregate(PARTIAL)[year] => [year:bigint, count_9:bigint]
                             count_9 := "count"(*)
                         - TableScan[hive:hive:default:track_metadata_csv, originalConstraint = true] => [year:bigint]
                                 LAYOUT: hive
                                 year := HiveColumnHandle{clientId=hive, name=year, hiveType=smallint, hiveColumnIndex=5, partitionKey=false}
EXPLAIN
SELECT year, count(*)
FROM track_metadata_parquet
GROUP BY year
ORDER BY year;
                                                                  Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------
 - Output[year, _col1] => [year:bigint, count:bigint]
         _col1 := count
     - Sort[year ASC_NULLS_LAST] => [year:bigint, count:bigint]
         - Exchange[GATHER] => year:bigint, count:bigint
             - Aggregate(FINAL)[year] => [year:bigint, count:bigint]
                     count := "count"("count_9")
                 - Exchange[REPARTITION] => year:bigint, count_9:bigint
                     - Aggregate(PARTIAL)[year] => [year:bigint, count_9:bigint]
                             count_9 := "count"(*)
                         - TableScan[hive:hive:default:track_metadata_parquet, originalConstraint = true] => [year:bigint]
                                 LAYOUT: hive
                                 year := HiveColumnHandle{clientId=hive, name=year, hiveType=smallint, hiveColumnIndex=5, partitionKey=false}

Airpal: A Web Interface for Presto

In March 2015 AirBNB announced Airpal, a web-based query tool that works with Presto. Beyond a visual interface to run queries it offers access controls, metadata exploration, query progress tracking and CSV exporting of results.

These are the steps I took to install this software and launch it within the same Docker container as Presto.

The first thing I had to do was to get a copy of Airpal's git repository and build the Airpal JAR file.

$ apt-get install \
    build-essential \
    git \
    gradle \
    mysql-server
$ git clone https://github.com/airbnb/airpal.git
$ cd airpal
$ ./gradlew clean shadowJar

I then created a MySQL database for Airpal to store it's configuration and logs.

$ mysql \
    -uroot \
    -proot \
    -e'CREATE DATABASE `airpal` CHARACTER SET utf8 COLLATE utf8_general_ci;'

Airpal needs a configuration file to tell it's JAR file how to, among other things, connect with the MySQL data backend.

$ vi reference.yml
# Logging settings
logging:

  loggers:
    org.apache.shiro: INFO

  # The default level of all loggers. Can be OFF, ERROR, WARN, INFO, DEBUG, TRACE, or ALL.
  level: INFO

# HTTP-specific options.
server:
  applicationConnectors:
    - type: http
      port: 8081
      idleTimeout: 10 seconds

  adminConnectors:
    - type: http
      port: 8082

shiro:
  iniConfigs: ["classpath:shiro_allow_all.ini"]

dataSourceFactory:
  driverClass: com.mysql.jdbc.Driver
  user: root
  password: root
  url: jdbc:mysql://127.0.0.1:3306/airpal

# The URL to the Presto coordinator.
prestoCoordinator: http://127.0.0.1:8080

With that in place I setup Airpal's database schema and launched the server in the background.

$ java -Duser.timezone=UTC \
       -cp build/libs/airpal-*-all.jar \
       com.airbnb.airpal.AirpalApplication \
       db migrate reference.yml
$ java -server \
       -Duser.timezone=UTC \
       -cp build/libs/airpal-*-all.jar \
       com.airbnb.airpal.AirpalApplication \
       server reference.yml &

Their is a health check endpoint on port 8082.

$ curl -s http://127.0.0.1:8082/healthcheck | \
    python -m json.tool
{
    "deadlocks": {
        "healthy": true
    },
    "mysql": {
        "healthy": true
    },
    "presto": {
        "healthy": true
    }
}

If that all looks well the web interface is configured to be exposed on port 8081.

$ curl http://127.0.0.1:8081/app
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.