Home | Benchmarks | Archives | Atom Feed

Posted on Tue 27 March 2018

1.1 Billion Taxi Rides: EC2 versus EMR

In this blog post I wanted to take two ways of running Hadoop jobs that cost less than $3.00 / hour on Amazon Web Services (AWS) and see how well they compare in terms of performance.

The $3.00 price point was driven by the first method: running a single-node Hadoop installation. I wanted to make sure the dataset used in this benchmark could easily fit into memory.

The price set the limit for the second method: AWS EMR. This is Amazon's Hadoop Platform offering. It has a huge feature set but the key one is that it lets you setup Hadoop clusters with very little instruction. The $3.00 price limit includes the service fee for EMR.

Note I'll be running everything in AWS' Irish region and the prices mentioned are region- and in the case of spot prices, time-specific.

The dataset I'll be using in the benchmarks is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put the dataset together.

This is the same dataset I've used to benchmark Amazon Athena, BigQuery, BrytlytDB, ClickHouse, Elasticsearch, EMR, kdb+/q, MapD, PostgreSQL, Redshift and Vertica. I have a single-page summary of all these benchmarks for comparison.

The Hardware Setups

The single-node machine will be an i3.8xlarge running in eu-west-1. It has 32 vCPUs, 244 GB of RAM, 4 x 1.9 NVMe SSDs and 10 Gb/s networking (which shouldn't see almost any use) that costs $2.752 / hour as a reserved instance in Ireland.

The EMR cluster will be made up of 21 m3.xlarge instances which have an combined 84 vCPUs, 315 GB of RAM, 1,680 GB of SSDs and "High" networking speeds between nodes (which will be used a lot) which costs $2.96 / hour using 20 spot instances and one reserved instance in Ireland.

Note that no one EMR instance has more than 15 GB of RAM or 68.95 GB of HDFS capacity per machine so data locality and per-machine limits will come into play. That said the cluster has 21x any one machine's memory bandwidth and CPU clock cycles collectively.

Launching an i3.8xlarge Instance

For this benchmark I launched an i3.8xlarge EC2 instance in eu-west-1 using the Ubuntu 16 ami-1b791862 AMI image provided by Amazon. It took less than a minute from request till I was able to connect via SSH.

$ ssh -i ~/.ssh/emr4.pem \
      ubuntu@ec2-34-252-59-151.eu-west-1.compute.amazonaws.com

The machine reports having a Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz which has a memory controller supporting DDR4-2400 at a throughput of 71.53 GB/s. This instance sits on shared infrastructure so it's uncertain how much of this hardware was available to me during my benchmark.

$ lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                32
On-line CPU(s) list:   0-31
Thread(s) per core:    2
Core(s) per socket:    16
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 79
Model name:            Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
Stepping:              1
CPU MHz:               1200.312
CPU max MHz:           3000.0000
CPU min MHz:           1200.0000
BogoMIPS:              4600.08
Hypervisor vendor:     Xen
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              46080K
NUMA node0 CPU(s):     0-31
Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq monitor est ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single kaiser fsgsbase bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx xsaveopt ida

The NVMe SSD storage comes unmounted so only the 8 GB root partition will be available at launch.

$ lsblk
NAME    MAJ:MIN RM  SIZE RO TYPE MOUNTPOINT
xvda    202:0    0    8G  0 disk
└─xvda1 202:1    0    8G  0 part /
nvme0n1 259:0    0  1.7T  0 disk
nvme1n1 259:1    0  1.7T  0 disk
nvme2n1 259:3    0  1.7T  0 disk
nvme3n1 259:2    0  1.7T  0 disk

This is the meminfo report for the 244 GB of RAM.

$ cat /proc/meminfo
MemTotal:       251754696 kB
MemFree:        243294804 kB
MemAvailable:   249662812 kB
Buffers:           58240 kB
Cached:          7303624 kB
SwapCached:            0 kB
Active:          2633300 kB
Inactive:        4899236 kB
Active(anon):     173632 kB
Inactive(anon):     8396 kB
Active(file):    2459668 kB
Inactive(file):  4890840 kB
Unevictable:        3652 kB
Mlocked:            3652 kB
SwapTotal:             0 kB
SwapFree:              0 kB
Dirty:               128 kB
Writeback:             0 kB
AnonPages:        174476 kB
Mapped:            53872 kB
Shmem:              8848 kB
Slab:             358620 kB
SReclaimable:     279248 kB
SUnreclaim:        79372 kB
KernelStack:        8512 kB
PageTables:         3392 kB
NFS_Unstable:          0 kB
Bounce:                0 kB
WritebackTmp:          0 kB
CommitLimit:    125877348 kB
Committed_AS:    1107628 kB
VmallocTotal:   34359738367 kB
VmallocUsed:           0 kB
VmallocChunk:          0 kB
HardwareCorrupted:     0 kB
AnonHugePages:    129024 kB
CmaTotal:              0 kB
CmaFree:               0 kB
HugePages_Total:       0
HugePages_Free:        0
HugePages_Rsvd:        0
HugePages_Surp:        0
Hugepagesize:       2048 kB
DirectMap4k:      102400 kB
DirectMap2M:     3043328 kB
DirectMap1G:    254803968 kB

Installing Hadoop 3

I'll be following the installation notes from my Hadoop 3 Single-Node Install Guide blog post but there were a few steps I need to change for this EC2 instance. Below I've listed each of these changes to those instructions.

First, I installed the AWS CLI tool so I could download the dataset from S3 that I'll be using in this benchmark.

$ sudo apt install awscli

The following is how I set my AWS credentials.

$ awscli configure

This lets the AWS CLI make 100 concurrent requests and will speed up S3 access tremendously.

$ aws configure set \
    default.s3.max_concurrent_requests \
    100

There is an existing authorised key for root's account so I appended rather than replaced the authorized_keys file.

$ sudo su
$ ssh-keygen
$ cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

Oracle's Java Distribution wouldn't download so I needed to go for OpenJDK.

Connecting to download.oracle.com (download.oracle.com)|23.40.96.162|:80... connected.
HTTP request sent, awaiting response... 404 Not Found
2018-03-24 07:42:24 ERROR 404: Not Found.

The following installs OpenJDK.

$ sudo apt install \
    openjdk-8-jre \
    openjdk-8-jdk-headless

I then needed to change JAVA_HOME in the common environment settings file.

$ sudo vi /etc/profile
export JAVA_HOME=/usr

Mounting the NVMe SSDs

Below will mount the NVMe drives to /mnt/nvme, format them with the EXT4 file system and grant the ubuntu user access to the entire array.

$ sudo mkfs.ext4 -E nodiscard /dev/nvme0n1
$ sudo mkdir /mnt/nvme
$ sudo mount -o discard /dev/nvme0n1 /mnt/nvme
$ sudo chown -R ubuntu /mnt/nvme

The system now reports 1.9 TB of capacity on the /mnt/nvme mount.

$ df -H
Filesystem      Size  Used Avail Use% Mounted on
udev            129G     0  129G   0% /dev
tmpfs            26G  9.1M   26G   1% /run
/dev/xvda1      8.3G  4.9G  3.4G  59% /
tmpfs           129G     0  129G   0% /dev/shm
tmpfs           5.3M     0  5.3M   0% /run/lock
tmpfs           129G     0  129G   0% /sys/fs/cgroup
tmpfs            26G     0   26G   0% /run/user/1000
/dev/nvme0n1    1.9T   72M  1.8T   1% /mnt/nvme

Setting Up HDFS on the NVMe Array

In the Hadoop 3 Install Guide /opt/hdfs is used for HDFS storage. The following uses NVMe-based folders instead.

I'll first create those folders.

$ mkdir -p /mnt/nvme/{datanode,namenode}

Then I'll replace the hdfs-site.xml config file with the following.

$ sudo vi /opt/hadoop/etc/hadoop/hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/mnt/nvme/datanode</value>
        <final>true</final>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/mnt/nvme/namenode</value>
        <final>true</final>
    </property>
    <property>
        <name>dfs.namenode.http-address</name>
        <value>localhost:50070</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

The replication factor has been set to one since there is a single array visible to HDFS.

After I've launched HDFS I allowed the ubuntu user access to the entire file system.

$ hdfs dfs -chown ubuntu /

I then made sure the HDFS capacity was a number that looked sensible.

$ hdfs dfsadmin -report \
    | grep 'Configured Capacity' \
    | tail -n1
Configured Capacity: 1870051708928 (1.70 TB)

128 GB of Memory for Spark

The dataset is around 84 GB in ORC format. I wasn't sure how much overhead on top of that Spark would need to hold it in memory. I settled on 128 GB. Later on I ran queries that took 20+ seconds to complete and I saw no disk traffic with iotop so I'm convinced this was enough.

$ sudo vi /opt/spark/conf/spark-env.sh
SPARK_EXECUTOR_MEMORY=128g
SPARK_DRIVER_MEMORY=128g
SPARK_WORKER_MEMORY=128g
SPARK_DAEMON_MEMORY=128g

Downloading Data onto HDFS

The dataset I'll be using is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put the dataset together.

In past blogs I've shown how I've converted this dataset into ORC format and the reasons behind it so I won't go into detail here. Below I've downloaded the ORC format I have saved on AWS S3. I then create an HDFS folder on S3 and copied the files there.

$ mkdir -p /mnt/nvme/orc
$ cd /mnt/nvme

$ aws s3 sync \
    s3://<bucket>/orc/ \
    /mnt/nvme/orc/

$ hdfs dfs -mkdir /trips_orc/
$ hdfs dfs -copyFromLocal \
    /mnt/nvme/orc/* \
    /trips_orc/

Another tool that can be used to copy from S3 to HDFS is S3DistCp. I hope to review it in a later blog post.

Settings Up Tables in Hive

Hive will tie the data on HDFS to the table schema I'll interact with in Spark and Presto.

$ hive
CREATE EXTERNAL TABLE trips_orc (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/trips_orc/';

Spark Benchmark Results

Spark's Master binds to a single hostname when it launches. Below I searched through its logs to find which hostname it picked.

$ grep 'registered with master' \
    /opt/spark/logs/spark-root-org.apache.spark.deploy.*
/opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-ip-172-30-0-18.out 2018-03-24 08:03:19 INFO  Worker:54 - Successfully registered with master spark://ip-172-30-0-18.eu-west-1.compute.internal:7077

I then used that hostname when launching the Spark SQL client.

$ spark-sql \
    --master spark://ip-172-30-0-18.eu-west-1.compute.internal:7077 \
    --num-executors 1

Below are the fastest times I saw for each query. I made sure to run each query repeatedly and monitored disk activity via iotop to ensure data was being read from memory and not the NVMe storage array during the fastest run.

The following completed in 22 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 25 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 27 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 65 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

While running these queries I could see the CPU was becoming something of a bottleneck.

top - 08:54:14 up  1:50,  4 users,  load average: 0.20, 1.64, 6.43
Tasks: 360 total,   2 running, 358 sleeping,   0 stopped,   0 zombie
%Cpu0  : 94.7 us,  1.7 sy,  0.0 ni,  3.3 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 st
%Cpu1  : 95.0 us,  1.7 sy,  0.0 ni,  3.4 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu2  : 98.3 us,  0.3 sy,  0.0 ni,  1.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu3  : 87.3 us,  4.3 sy,  0.0 ni,  8.4 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu4  : 95.0 us,  1.3 sy,  0.0 ni,  3.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu5  : 98.3 us,  0.0 sy,  0.0 ni,  1.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu6  : 96.7 us,  1.3 sy,  0.0 ni,  2.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu7  : 92.7 us,  1.0 sy,  0.0 ni,  5.6 id,  0.3 wa,  0.0 hi,  0.3 si,  0.0 st
%Cpu8  : 93.7 us,  1.3 sy,  0.0 ni,  5.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu9  : 92.3 us,  0.7 sy,  0.0 ni,  7.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu10 : 97.3 us,  0.7 sy,  0.0 ni,  2.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu11 : 97.3 us,  0.7 sy,  0.0 ni,  2.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu12 : 92.0 us,  3.0 sy,  0.0 ni,  5.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu13 : 94.9 us,  1.0 sy,  0.0 ni,  4.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu14 : 88.3 us,  3.0 sy,  0.0 ni,  8.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu15 : 92.6 us,  2.3 sy,  0.0 ni,  4.7 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 st
%Cpu16 : 94.7 us,  2.3 sy,  0.0 ni,  2.6 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 st
%Cpu17 : 93.0 us,  0.7 sy,  0.0 ni,  6.0 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 st
%Cpu18 : 93.0 us,  3.7 sy,  0.0 ni,  3.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu19 : 91.2 us,  0.7 sy,  0.0 ni,  8.1 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st

At first I was wondering if the NVMe driver was causing too much overhead but disk activity was nil when I captured the above output from top. Without putting together a flame graph I suspect this is just the CPU pulling data from memory and aggregating it.

The memory usage on the system supports this.

top - 08:56:57 up  1:53,  4 users,  load average: 21.15, 9.72, 8.75
Tasks: 360 total,   1 running, 359 sleeping,   0 stopped,   0 zombie
%Cpu(s): 92.9 us,  1.5 sy,  0.0 ni,  5.5 id,  0.0 wa,  0.0 hi,  0.1 si,  0.0 st
KiB Mem : 25175468+total, 40119300 free, 68494352 used, 14314105+buff/cache
KiB Swap:        0 total,        0 free,        0 used. 18175958+avail Mem

   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
 98118 root      20   0  0.138t 0.047t  33032 S  2976 20.2 176:49.70 /usr/bin/java -+
 19797 root      20   0 32.913g 390216  25668 S  34.9  0.2   9:42.22 /usr/bin/java -+

Process 98118 is the Spark Master and 19797 is the HDFS Datanode.

When this benchmark was finished I shut down Spark's services in order to give Presto as many resources as possible.

$ sudo /opt/spark/sbin/stop-master.sh
$ sudo /opt/spark/sbin/stop-slaves.sh

Presto Benchmark Results

Below are the fastest times I saw running the following queries using Presto.

$ presto \
    --server localhost:8080 \
    --catalog hive \
    --schema default

The following completed in 11 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 14 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 16 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 22 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Launching an AWS EMR Cluster

As of version 5.12.0 AWS EMR doesn't yet support Hadoop 3.0.0 nor Spark 2.3.0. Using a replication factor of one I haven't seen significant performance differences, albeit on a single node, between running Spark 2.2.1 versus 2.3.0 but I have yet to run HDFS benchmarks between Hadoop 2.8.x versus 3.0.0.

The Presto (0.188) and Hive (2.3.2) versions match the single-instance setup.

It's also important to note that I didn't come across any EC2 instance types supported by EMR that came with NVMe drives. Regular SSDs were the fastest storage I could find.

The following will launch an EMR cluster with a single master node, ten data nodes and ten task nodes. With the exception of the master node, all nodes will be spot instances to keep the cost down.

$ aws emr create-cluster \
    --applications \
        Name=Hadoop \
        Name=Hive \
        Name=Presto \
        Name=Spark \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --ebs-root-volume-size 10 \
    --ec2-attributes '{
            "KeyName": "emr",
            "InstanceProfile": "EMR_EC2_DefaultRole",
            "SubnetId": "subnet-6a55a532",
            "EmrManagedSlaveSecurityGroup": "sg-9f2607f9",
            "EmrManagedMasterSecurityGroup": "sg-902607f6"
        }' \
    --enable-debugging \
    --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
            --instance-groups '[{
                    "InstanceCount": 10,
                    "BidPrice": "1",
                    "InstanceGroupType": "CORE",
                    "InstanceType": "m3.xlarge",
                    "Name": "Core - 2"
                }, {
                    "InstanceCount": 1,
                    "InstanceGroupType": "MASTER",
                    "InstanceType": "m3.xlarge",
                    "Name": "Master - 1"
                }, {
                    "InstanceCount": 10,
                    "BidPrice": "1",
                    "InstanceGroupType": "TASK",
                    "InstanceType": "m3.xlarge",
                    "Name": "Task - 3"
                }]' \
    --name 'My cluster' \
    --region eu-west-1 \
    --release-label emr-5.12.0 \
    --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
    --service-role EMR_DefaultRole \
    --termination-protected

With the cluster provisioned and bootstrapped I was able to SSH in.

$ ssh -i ~/.ssh/emr.pem \
    hadoop@ec2-54-194-43-34.eu-west-1.compute.amazonaws.com
       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2017.09-release-notes/
4 package(s) needed for security, out of 9 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

The first thing I checked was the HDFS cluster's capacity. The replication factor will be set to three so 240 GB across the 689 GB available will be used for this dataset.

$ hdfs dfsadmin -report \
    | grep 'Configured Capacity'
Configured Capacity: 740336517120 (689.49 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)

I then created a folder on HDFS for the taxi trip dataset.

$ hdfs dfs -mkdir /trips_orc/

The primary drive on this system only had around 4.7 GB of capacity available and I needed to use it to transfer data from S3 to HDFS.

$ df -H
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs        7.8G   29k  7.8G   1% /dev
tmpfs           7.8G     0  7.8G   0% /dev/shm
/dev/xvda1       11G  4.7G  5.8G  45% /
/dev/xvdb1      5.4G   35M  5.4G   1% /emr
/dev/xvdb2       35G  348M   35G   1% /mnt
/dev/xvdc        41G   35M   41G   1% /mnt1

I ended up using this script that downloaded the 56 ORC files one at a time and deleted them from the primary drive once they were on HDFS.

$ PREFIX='s3://<bucket>/orc/'
$ ZEROS='0000'
$ SUFFIX='_0'

$ for i in {00..55}; do
      aws s3 cp \
        $PREFIX$ZEROS$i$SUFFIX \
        ./
      hdfs dfs -copyFromLocal \
        $ZEROS$i$SUFFIX \
        /trips_orc/
      rm $ZEROS$i$SUFFIX
  done

Creating Tables in Hive on AWS EMR

Below I created the table that points to the ORC data stored across the HDFS cluster. This SQL matches the SQL used to create the single-node table.

$ hive
CREATE EXTERNAL TABLE trips_orc (
        trip_id                 INT,
        vendor_id               STRING,
        pickup_datetime         TIMESTAMP,
        dropoff_datetime        TIMESTAMP,
        store_and_fwd_flag      STRING,
        rate_code_id            SMALLINT,
        pickup_longitude        DOUBLE,
        pickup_latitude         DOUBLE,
        dropoff_longitude       DOUBLE,
        dropoff_latitude        DOUBLE,
        passenger_count         SMALLINT,
        trip_distance           DOUBLE,
        fare_amount             DOUBLE,
        extra                   DOUBLE,
        mta_tax                 DOUBLE,
        tip_amount              DOUBLE,
        tolls_amount            DOUBLE,
        ehail_fee               DOUBLE,
        improvement_surcharge   DOUBLE,
        total_amount            DOUBLE,
        payment_type            STRING,
        trip_type               SMALLINT,
        pickup                  STRING,
        dropoff                 STRING,

        cab_type                STRING,

        precipitation           SMALLINT,
        snow_depth              SMALLINT,
        snowfall                SMALLINT,
        max_temperature         SMALLINT,
        min_temperature         SMALLINT,
        average_wind_speed      SMALLINT,

        pickup_nyct2010_gid     SMALLINT,
        pickup_ctlabel          STRING,
        pickup_borocode         SMALLINT,
        pickup_boroname         STRING,
        pickup_ct2010           STRING,
        pickup_boroct2010       STRING,
        pickup_cdeligibil       STRING,
        pickup_ntacode          STRING,
        pickup_ntaname          STRING,
        pickup_puma             STRING,

        dropoff_nyct2010_gid    SMALLINT,
        dropoff_ctlabel         STRING,
        dropoff_borocode        SMALLINT,
        dropoff_boroname        STRING,
        dropoff_ct2010          STRING,
        dropoff_boroct2010      STRING,
        dropoff_cdeligibil      STRING,
        dropoff_ntacode         STRING,
        dropoff_ntaname         STRING,
        dropoff_puma            STRING
    ) STORED AS orc
      LOCATION '/trips_orc/';

Spark SQL on EMR Benchmark Results

The following were the fastest times I saw after running each query multiple times.

$ spark-sql

The following completed in 28 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 31 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 33 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 80 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Parquet is a more performant format for Spark but I running those benchmarks didn't fit in the time box for this blog. I'll revisit Spark and Parquet once AWS EMR updates Spark to 2.3.0. ORC performance is supposed to be 2-5x faster with that release as well.

Presto on EMR Benchmark Results

The following were the fastest times I saw after running each query multiple times.

$ presto-cli \
    --catalog hive \
    --schema default

The following completed in 4.88 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 11 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 12 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 15 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Closing Thoughts

Below is a recap of the fastest times seen in the benchmarks above sorted by query 1.

Query 1 Query 2 Query 3 Query 4 Setup
4.88 11 12 15 EMR 5.12.0 & Presto 0.188
11 14 16 22 i3.8xlarge & Presto 0.188
22 25 27 65 i3.8xlarge & Spark 2.3.0
28 31 33 80 EMR 5.12.0 & Spark 2.2.1

The increased RAM bandwidth and compute power of the cluster did a lot to speed up Presto's query 1 performance. Though costs between the two solutions are close the EMR cluster has the added benefit of redundancy.

It's worth noting EMR's setup time. It can take up to 25 minutes for a cluster to become fully functional. Had I used an AMI image for the single-node instance I would have been up and running from the moment I could SSH in. If you provision daily and spend four hours working in Spark between meetings this is an eighth of your time waiting for infrastructure.

That being said what EMR is doing during bootstrapping is the result of years of fixing issues and making sure it's a rock-solid setup. The installation notes I put together haven't been put through their paces by Amazon's EMR customer base over several years and even one or two issues in the EC2 setup could cost a few man days of productivity for analysts and platform engineers.

As I wrote at the beginning of this blog post I wanted to compare the two methods based on performance and the winner is the EMR Cluster.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.