Home | Benchmarks | Archives | Atom Feed

Posted on Mon 22 February 2016

A Billion Taxi Rides in Hive & Presto

A few weeks ago I published a blog post on importing the metadata of a billion+ taxi rides in New York City into Redshift. Since then I've been looking at getting the same dataset into other data stores.

This blog post will cover the steps I took to pull all billion+ records into a Hadoop cluster running in Docker containers on my local machine. I used Docker containers so that I could simulate the steps and shell commands that one would need on a actual Hadoop cluster without having to invest in multiple machines.

I'll contrast the Presto and Hive execution engines and the underlying data formats of ORC and Parquet.

Why ORC over Parquet?

Hive and Presto support working with data stored in several formats. Among others, Optimized Row Columnar (ORC) and Parquet formatted-data can be read from and in some cases written to by Hive and Presto. Each format has it's own advantages and trade offs as well as inconsistent behaviours when being used by Hive and Presto.

For example, as of this writing, VARCHAR(n) AND STRING fields in Parquet format aren't yet supported in Presto. INT and SMALLINT data types will be cast to BIGINT when used with Presto but DECIMAL fields need to be changed to the DOUBLE data type for Presto to be able to see them.

At the 2014 Hadoop Summit, Dain Sundstrom, one of the members of the Presto team at Facebook, stated that ORC-formatted data was performing the best among the file formats he was using in his microbenchmarks with Presto. He went on to state that at the time the Praquet reader code in Hive left a lot to be desired in terms of performance.

Presto's ORC reader supports predicate pushdown, the ability to skip past unneeded data, lazy reads and vectorized reads which up until recently, had yet to be ported to the Parquet reader in Presto. ORC files, much like AWS Redshift's Zone Maps, contain the minimum and maximum value of each column per stripe (about 1 million rows) and also for every 10,000 rows. These features can bring significant performance increases for certain workloads.

Why Presto & ORC over PostgreSQL?

Presto can use multiple threads per worker across multiple machines when executing a query. If you have three computers each with a 4-core processor then you'd effectively have 12 cores to execute your query across. PostgreSQL is process-based rather than thread-based so a single query can only use a single CPU core. PostgreSQL doesn't yet support the task of breaking up a single query so that it can run across multiple machines.

ORC format uses run-length encoding (read: compression) on integer columns and dictionary encoding on string columns. This means the data stored as an ORC file will use up less disk space than it would as a PostgreSQL database or as gzip files.

When the trip data was imported into PostgreSQL it was 321 GB. Adding b-tree indices to the various columns could potentially double the size of whole database.

When the data was denormalised and dumped to CSV format it was around 500 GB uncompressed and 104 GB when gzip'ed.

When the data is imported into HDFS using out-of-the-box settings and stored in ORC format it only needs 42 GB of space on each HDFS data node that the data lives on.

Hive Up and Running

Hive is used to create the ORC-formatted files Presto will work with and run the Metastore that Presto relies on as well. But Hive won't be used to run any analytical queries from Presto itself. Facebook have stated that Presto is able to run queries significantly faster than Hive as my benchmarks below will show. Despite that, as of version 0.138 of Presto, there are some steps in the ETL process that Presto still leans on Hive for.

This process begins with a fresh Ubuntu 15 installation acting as the host for the Docker containers. Apache Bigtop will then run various scripts to setup a Hadoop cluster within those containers. I've allocated 8 GB of memory to the machine and there is 2 TB GB of mechanical drive space available.

To start, the following will setup the dependencies needed to build and run the Docker containers. Docker will be installed via a 3rd-party repository so we'll need to add that repository's credentials and details in first.

$ sudo apt-get install software-properties-common
$ sudo apt-key adv \
    --keyserver hkp://p80.pool.sks-keyservers.net:80 \
    --recv-keys 58118E89F3A912897C070ADBF76221572C52609D
$ sudo add-apt-repository \
    "deb https://apt.dockerproject.org/repo ubuntu-wily main"
$ sudo apt-get update

If there happens to be any existing Docker installation on the system make sure it's removed to avoid any version conflicts.

$ sudo apt-get purge lxc-docker

You may need extra drivers which are left out of the base Kernel package that comes with Ubuntu. I found with Ubuntu 15 this wasn't needed but if the following commands does install anything then restart your box after the installation completes.

$ sudo apt-get install linux-image-extra-$(uname -r)

If you see the following then no extra drivers were installed and you can continue on without a reboot.

0 to upgrade, 0 to newly install, 0 to remove and 83 not to upgrade.

The following will install Docker, Virtualbox and Vagrant:

$ sudo apt-get install \
    docker-engine \
    virtualbox \
    vagrant
$ sudo service docker start

With Docker ready I'll checkout the Bigtop git repository and launch the Ubuntu 14.04-based containers (as of this writing this is the latest supported version of Ubuntu on Intel-based systems).

$ git clone https://github.com/apache/bigtop.git
$ cd bigtop/bigtop-deploy/vm/vagrant-puppet-docker/
$ sudo docker pull bigtop/deploy:ubuntu-14.04

Below are the settings I've saved into the vagrantconfig.yaml configuration file.

$ vi vagrantconfig.yaml
docker:
        memory_size: "8192"
        image:  "bigtop/deploy:ubuntu-14.04"

boot2docker:
        memory_size: "8192"
        number_cpus: "2"

repo: "http://bigtop-repos.s3.amazonaws.com/releases/1.0.0/ubuntu/trusty/x86_64"
distro: debian
components: [hadoop, yarn, hive]
namenode_ui_port: "50070"
yarn_ui_port: "8088"
hbase_ui_port: "60010"
enable_local_repo: false
smoke_test_components: [mapreduce, pig]
jdk: "openjdk-7-jdk"

I'll then launch a 3-node cluster.

$ sudo ./docker-hadoop.sh --create 3

Downloading 104 GB of Taxi Data

In my previous blog post on importing 1.1 billion taxi trips into Redshift, I created 56 ~2 GB gzip files containing denormalised taxi trip data in CSV format. You should generate these files if you haven't already and store them in the ~/bigtop/bigtop-deploy/vm/vagrant-puppet-docker folder so they'll be available to each Docker container in the Hadoop cluster.

There will be a need to refer to these files throughout this exercise so I'll create a manifest file of their file names. These lines shouldn't contain any paths so we can use them in S3 URLs and HDFS paths without needing to transform them.

$ vi trips.manifest
trips_xaa.csv.gz
trips_xab.csv.gz
trips_xac.csv.gz
trips_xad.csv.gz
trips_xae.csv.gz
trips_xaf.csv.gz
trips_xag.csv.gz
trips_xah.csv.gz
trips_xai.csv.gz
trips_xaj.csv.gz
trips_xak.csv.gz
trips_xal.csv.gz
trips_xam.csv.gz
trips_xan.csv.gz
trips_xao.csv.gz
trips_xap.csv.gz
trips_xaq.csv.gz
trips_xar.csv.gz
trips_xas.csv.gz
trips_xat.csv.gz
trips_xau.csv.gz
trips_xav.csv.gz
trips_xaw.csv.gz
trips_xax.csv.gz
trips_xay.csv.gz
trips_xaz.csv.gz
trips_xba.csv.gz
trips_xbb.csv.gz
trips_xbc.csv.gz
trips_xbd.csv.gz
trips_xbe.csv.gz
trips_xbf.csv.gz
trips_xbg.csv.gz
trips_xbh.csv.gz
trips_xbi.csv.gz
trips_xbj.csv.gz
trips_xbk.csv.gz
trips_xbl.csv.gz
trips_xbm.csv.gz
trips_xbn.csv.gz
trips_xbo.csv.gz
trips_xbp.csv.gz
trips_xbq.csv.gz
trips_xbr.csv.gz
trips_xbs.csv.gz
trips_xbt.csv.gz
trips_xbu.csv.gz
trips_xbv.csv.gz
trips_xbw.csv.gz
trips_xbx.csv.gz
trips_xby.csv.gz
trips_xbz.csv.gz
trips_xca.csv.gz
trips_xcb.csv.gz
trips_xcc.csv.gz
trips_xcd.csv.gz

If you've kept these files in an S3 bucket then the following should download them. Be sure to change the bucket name to the one you've used.

$ sudo apt-get install s3cmd
$ s3cmd --configure

$ while read filename; do
      s3cmd get s3://trips_metadata_example/$filename
  done < trips.manifest

SSH into the Hadoop Cluster

With all that in place I'll ssh into the first Bigtop instance.

$ sudo vagrant ssh bigtop1

Inside the container I'll configure the Hive Metastore. This contains all the metadata needed for Hive and Presto to work with the data files we'll be storing in HDFS. The Metastore will be configured to use MySQL as it's data backend.

$ apt-get -y install \
    libmysql-java \
    mysql-server

$ ln -s /usr/share/java/mysql-connector-java.jar \
        /usr/lib/hive/lib/mysql-connector-java.jar

$ /etc/init.d/mysql start

$ mysql -uroot -proot -e'CREATE DATABASE hcatalog;'

When you install MySQL you'll be prompted to set a root login and password. I've set both values to root for this exercise.

Below is the Hive site configuration file.

$ vi /etc/hive/conf/hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>bigtop1.docker</value>
    </property>

    <property>
        <name>hive.execution.engine</name>
        <value>mr</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost/hcatalog?createDatabaseIfNotExist=true</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>root</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>

    <property>
        <name>hive.hwi.war.file</name>
        <value>/usr/lib/hive/lib/hive-hwi.war</value>
    </property>
</configuration>

I then made sure the Metastore's database exists.

$ echo 'CREATE DATABASE IF NOT EXISTS metastore_db;' | hive

After that I launched it's Metastore service in the background. You'll see it binded to TCP port 9083 if it's running.

$ hive --service metastore &
$ netstat -an | grep 9083

Setup Tables in Hive

We will need a holding table to bring the CSV-formatted data into before we can then load that data into an ORC-formatted table. The CSV-formatted table will be loaded up one file at a time. Then that data will then be inserted into an ORC-formatted table and then the CSV-formatted table will be truncated.

Presto doesn't support as many data types as Hive in either ORC or Parquet-formatted files. In Presto, any column using a data type that is supported in Hive but isn't supported in Presto simply won't appear among the fields that are using supported data types. I've had cases where 50-odd fields are showing for a table in Hive and only 29 fields appears when I loaded the same table up in Presto.

If you use the INT and SMALLINT data types they'll be cast to BIGINT when used with Presto. DECIMAL fields need to be changed to use the DOUBLE data type before Presto will be able to see them.

Below I'll setup two Hive tables, one to import the CSV data into and an ORC-formatted table that will soon contain all billion+ records from the taxi dataset.

$ hive
CREATE TABLE trips_csv (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

CREATE TABLE trips_orc (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc;

Importing 1.1 Billion Trips into an ORC-Formatted Table

The following commands will list all the file names in the manifest file one at a time. Each file will then be loaded from the vagrant mount into HDFS. Once in HDFS, each file will be imported into a CSV-formatted table. Once that data is loaded into the table the original file on HDFS will automatically be removed. The data in the CSV-formatted table will then be loaded into the ORC-formatted table. Once that's done the CSV-formatted table will be truncated.

while read filename; do
    echo $filename
    hadoop fs -put /vagrant/$filename /tmp/
    echo "LOAD DATA INPATH '/tmp/$filename'
          INTO TABLE trips_csv;

          INSERT INTO TABLE trips_orc
          SELECT * FROM trips_csv;

          TRUNCATE TABLE trips_csv;" | hive
done < /vagrant/trips.manifest

I ran this process on mechanical drive and it took around 14 hours to complete.

Querying with Hive

The SQL features available differ between Hive and Presto so the Presto-equivalent SQL queries shown later on in this blog post will differ slightly from the Hive SQL queries shown below.

The following completed in 11 minutes and 36 seconds:

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 13 minutes and 43 seconds:

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 15 minutes and 14 seconds:

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 23 minutes and 48 seconds:

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Presto Up and Running

I now want to query the same ORC-formatted data using Presto. Presto requires Java 8 so I'll install that first.

$ add-apt-repository ppa:webupd8team/java
$ apt-get update
$ apt-get install oracle-java8-installer

Facebook aim to release a new version of Presto every two weeks so check to see which version is the most recent when installing from their gzip'ed binary distribution.

$ cd ~
$ wget -c https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.138/presto-server-0.138.tar.gz
$ tar xzf presto-server-0.138.tar.gz

Presto requires a data folder for it to store locks, logs and a few other items and also requires a number of configuration files before it can begin to work properly.

$ mkdir -p /root/datap
$ mkdir -p ~/presto-server-0.138/etc/catalog
$ cd ~/presto-server-0.138/etc

We need to create six configuration files. Below is an outline of where they live within the ~/presto-server-0.138/etc folder:

$ tree ~/presto-server-0.138/etc
etc
|-- catalog
|   |-- hive.properties
|   `-- jmx.properties
|-- config.properties
|-- jvm.config
|-- log.properties
`-- node.properties

Here are the commands to set the contents of each of the configuration files.

Below is the general server configuration. It will setup Presto to work in a standalone mode. In a real world scenario you'd want Presto running on several machines in order to maximise performance.

$ vi ~/presto-server-0.138/etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=800MB
query.max-memory-per-node=200MB
discovery-server.enabled=true
discovery.uri=http://127.0.0.1:8080

JVM configuration:

vi ~/presto-server-0.138/etc/jvm.config
-server
-Xmx800M
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:OnOutOfMemoryError=kill -9 %p

Logging settings:

$ vi ~/presto-server-0.138/etc/log.properties
com.facebook.presto=INFO

Node settings:

$ vi ~/presto-server-0.138/etc/node.properties
node.environment=dev
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/root/datap

Catalog settings for Java Management Extensions (JMX):

$ vi ~/presto-server-0.138/etc/catalog/jmx.properties
connector.name=jmx

Catalog settings for Hive:

$ vi ~/presto-server-0.138/etc/catalog/hive.properties
hive.metastore.uri=thrift://bigtop1.docker:9083
connector.name=hive-hadoop2

With those in place you can launch Presto's server.

$ ~/presto-server-0.138/bin/launcher start

It should expose a web frontend on port 8080 if it's running properly.

$ curl localhost:8080

The follow will setup the CLI tool for Presto:

$ cd ~
$ wget -c https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.138/presto-cli-0.138-executable.jar
$ mv presto-cli-0.138-executable.jar presto
$ chmod +x presto

Querying with Presto

It's no surprise that Presto will outperform Hive for many types of queries. Presto compiles queries down to byte code before the JVM turns it into machine code, it uses flat data structures that help eliminate almost all garbage collection issues and uses pipeline execution for interactive results.

Bar some syntax changes, I'll run the same queries on Presto that I ran on Hive and see how much quicker they perform.

$ ~/presto \
    --server localhost:8080 \
    --catalog hive \
    --schema default

The following completed in 1 minute and 57 seconds (~6x faster than Hive):

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 2 minutes and 57 seconds (~4.5x faster than Hive):

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 4 minutes and 13 seconds (~3.5x faster than Hive):

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 5 minutes and 6 seconds (~4.5x faster than Hive):

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.