In this blog post I wanted to take two ways of running Hadoop jobs that cost less than $3.00 / hour on Amazon Web Services (AWS) and see how well they compare in terms of performance.
The $3.00 price point was driven by the first method: running a single-node Hadoop installation. I wanted to make sure the dataset used in this benchmark could easily fit into memory.
The price set the limit for the second method: AWS EMR. This is Amazon's Hadoop Platform offering. It has a huge feature set but the key one is that it lets you setup Hadoop clusters with very little instruction. The $3.00 price limit includes the service fee for EMR.
Note I'll be running everything in AWS' Irish region and the prices mentioned are region- and in the case of spot prices, time-specific.
The dataset I'll be using in the benchmarks is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put the dataset together.
This is the same dataset I've used to benchmark Amazon Athena, BigQuery, BrytlytDB, ClickHouse, Elasticsearch, EMR, kdb+/q, MapD, PostgreSQL, Redshift and Vertica. I have a single-page summary of all these benchmarks for comparison.
The Hardware Setups
The single-node machine will be an i3.8xlarge running in eu-west-1. It has 32 vCPUs, 244 GB of RAM, 4 x 1.9 NVMe SSDs and 10 Gb/s networking (which shouldn't see almost any use) that costs $2.752 / hour as a reserved instance in Ireland.
The EMR cluster will be made up of 21 m3.xlarge instances which have an combined 84 vCPUs, 315 GB of RAM, 1,680 GB of SSDs and "High" networking speeds between nodes (which will be used a lot) which costs $2.96 / hour using 20 spot instances and one reserved instance in Ireland.
Note that no one EMR instance has more than 15 GB of RAM or 68.95 GB of HDFS capacity per machine so data locality and per-machine limits will come into play. That said the cluster has 21x any one machine's memory bandwidth and CPU clock cycles collectively.
Launching an i3.8xlarge Instance
For this benchmark I launched an i3.8xlarge EC2 instance in eu-west-1 using the Ubuntu 16 ami-1b791862 AMI image provided by Amazon. It took less than a minute from request till I was able to connect via SSH.
$ ssh -i ~/.ssh/emr4.pem \
ubuntu@ec2-34-252-59-151.eu-west-1.compute.amazonaws.com
The machine reports having a Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz which has a memory controller supporting DDR4-2400 at a throughput of 71.53 GB/s. This instance sits on shared infrastructure so it's uncertain how much of this hardware was available to me during my benchmark.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 2
Core(s) per socket: 16
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 79
Model name: Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
Stepping: 1
CPU MHz: 1200.312
CPU max MHz: 3000.0000
CPU min MHz: 1200.0000
BogoMIPS: 4600.08
Hypervisor vendor: Xen
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 46080K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq monitor est ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single kaiser fsgsbase bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx xsaveopt ida
The NVMe SSD storage comes unmounted so only the 8 GB root partition will be available at launch.
$ lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
xvda 202:0 0 8G 0 disk
└─xvda1 202:1 0 8G 0 part /
nvme0n1 259:0 0 1.7T 0 disk
nvme1n1 259:1 0 1.7T 0 disk
nvme2n1 259:3 0 1.7T 0 disk
nvme3n1 259:2 0 1.7T 0 disk
This is the meminfo report for the 244 GB of RAM.
$ cat /proc/meminfo
MemTotal: 251754696 kB
MemFree: 243294804 kB
MemAvailable: 249662812 kB
Buffers: 58240 kB
Cached: 7303624 kB
SwapCached: 0 kB
Active: 2633300 kB
Inactive: 4899236 kB
Active(anon): 173632 kB
Inactive(anon): 8396 kB
Active(file): 2459668 kB
Inactive(file): 4890840 kB
Unevictable: 3652 kB
Mlocked: 3652 kB
SwapTotal: 0 kB
SwapFree: 0 kB
Dirty: 128 kB
Writeback: 0 kB
AnonPages: 174476 kB
Mapped: 53872 kB
Shmem: 8848 kB
Slab: 358620 kB
SReclaimable: 279248 kB
SUnreclaim: 79372 kB
KernelStack: 8512 kB
PageTables: 3392 kB
NFS_Unstable: 0 kB
Bounce: 0 kB
WritebackTmp: 0 kB
CommitLimit: 125877348 kB
Committed_AS: 1107628 kB
VmallocTotal: 34359738367 kB
VmallocUsed: 0 kB
VmallocChunk: 0 kB
HardwareCorrupted: 0 kB
AnonHugePages: 129024 kB
CmaTotal: 0 kB
CmaFree: 0 kB
HugePages_Total: 0
HugePages_Free: 0
HugePages_Rsvd: 0
HugePages_Surp: 0
Hugepagesize: 2048 kB
DirectMap4k: 102400 kB
DirectMap2M: 3043328 kB
DirectMap1G: 254803968 kB
Installing Hadoop 3
I'll be following the installation notes from my Hadoop 3 Single-Node Install Guide blog post but there were a few steps I need to change for this EC2 instance. Below I've listed each of these changes to those instructions.
First, I installed the AWS CLI tool so I could download the dataset from S3 that I'll be using in this benchmark.
$ sudo apt install awscli
The following is how I set my AWS credentials.
$ awscli configure
This lets the AWS CLI make 100 concurrent requests and will speed up S3 access tremendously.
$ aws configure set \
default.s3.max_concurrent_requests \
100
There is an existing authorised key for root's account so I appended rather than replaced the authorized_keys file.
$ sudo su
$ ssh-keygen
$ cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
Oracle's Java Distribution wouldn't download so I needed to go for OpenJDK.
Connecting to download.oracle.com (download.oracle.com)|23.40.96.162|:80... connected.
HTTP request sent, awaiting response... 404 Not Found
2018-03-24 07:42:24 ERROR 404: Not Found.
The following installs OpenJDK.
$ sudo apt install \
openjdk-8-jre \
openjdk-8-jdk-headless
I then needed to change JAVA_HOME in the common environment settings file.
$ sudo vi /etc/profile
export JAVA_HOME=/usr
Mounting the NVMe SSDs
Below will mount the NVMe drives to /mnt/nvme, format them with the EXT4 file system and grant the ubuntu user access to the entire array.
$ sudo mkfs.ext4 -E nodiscard /dev/nvme0n1
$ sudo mkdir /mnt/nvme
$ sudo mount -o discard /dev/nvme0n1 /mnt/nvme
$ sudo chown -R ubuntu /mnt/nvme
The system now reports 1.9 TB of capacity on the /mnt/nvme mount.
$ df -H
Filesystem Size Used Avail Use% Mounted on
udev 129G 0 129G 0% /dev
tmpfs 26G 9.1M 26G 1% /run
/dev/xvda1 8.3G 4.9G 3.4G 59% /
tmpfs 129G 0 129G 0% /dev/shm
tmpfs 5.3M 0 5.3M 0% /run/lock
tmpfs 129G 0 129G 0% /sys/fs/cgroup
tmpfs 26G 0 26G 0% /run/user/1000
/dev/nvme0n1 1.9T 72M 1.8T 1% /mnt/nvme
Setting Up HDFS on the NVMe Array
In the Hadoop 3 Install Guide /opt/hdfs is used for HDFS storage. The following uses NVMe-based folders instead.
I'll first create those folders.
$ mkdir -p /mnt/nvme/{datanode,namenode}
Then I'll replace the hdfs-site.xml config file with the following.
$ sudo vi /opt/hadoop/etc/hadoop/hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.datanode.data.dir</name>
<value>/mnt/nvme/datanode</value>
<final>true</final>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/mnt/nvme/namenode</value>
<final>true</final>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>localhost:50070</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
The replication factor has been set to one since there is a single array visible to HDFS.
After I've launched HDFS I allowed the ubuntu user access to the entire file system.
$ hdfs dfs -chown ubuntu /
I then made sure the HDFS capacity was a number that looked sensible.
$ hdfs dfsadmin -report \
| grep 'Configured Capacity' \
| tail -n1
Configured Capacity: 1870051708928 (1.70 TB)
128 GB of Memory for Spark
The dataset is around 84 GB in ORC format. I wasn't sure how much overhead on top of that Spark would need to hold it in memory. I settled on 128 GB. Later on I ran queries that took 20+ seconds to complete and I saw no disk traffic with iotop so I'm convinced this was enough.
$ sudo vi /opt/spark/conf/spark-env.sh
SPARK_EXECUTOR_MEMORY=128g
SPARK_DRIVER_MEMORY=128g
SPARK_WORKER_MEMORY=128g
SPARK_DAEMON_MEMORY=128g
Downloading Data onto HDFS
The dataset I'll be using is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put the dataset together.
In past blogs I've shown how I've converted this dataset into ORC format and the reasons behind it so I won't go into detail here. Below I've downloaded the ORC format I have saved on AWS S3. I then create an HDFS folder on S3 and copied the files there.
$ mkdir -p /mnt/nvme/orc
$ cd /mnt/nvme
$ aws s3 sync \
s3://<bucket>/orc/ \
/mnt/nvme/orc/
$ hdfs dfs -mkdir /trips_orc/
$ hdfs dfs -copyFromLocal \
/mnt/nvme/orc/* \
/trips_orc/
Another tool that can be used to copy from S3 to HDFS is S3DistCp. I hope to review it in a later blog post.
Settings Up Tables in Hive
Hive will tie the data on HDFS to the table schema I'll interact with in Spark and Presto.
$ hive
CREATE EXTERNAL TABLE trips_orc (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_orc/';
Spark Benchmark Results
Spark's Master binds to a single hostname when it launches. Below I searched through its logs to find which hostname it picked.
$ grep 'registered with master' \
/opt/spark/logs/spark-root-org.apache.spark.deploy.*
/opt/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-ip-172-30-0-18.out 2018-03-24 08:03:19 INFO Worker:54 - Successfully registered with master spark://ip-172-30-0-18.eu-west-1.compute.internal:7077
I then used that hostname when launching the Spark SQL client.
$ spark-sql \
--master spark://ip-172-30-0-18.eu-west-1.compute.internal:7077 \
--num-executors 1
Below are the fastest times I saw for each query. I made sure to run each query repeatedly and monitored disk activity via iotop to ensure data was being read from memory and not the NVMe storage array during the fastest run.
The following completed in 22 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 25 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 27 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 65 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
While running these queries I could see the CPU was becoming something of a bottleneck.
top - 08:54:14 up 1:50, 4 users, load average: 0.20, 1.64, 6.43
Tasks: 360 total, 2 running, 358 sleeping, 0 stopped, 0 zombie
%Cpu0 : 94.7 us, 1.7 sy, 0.0 ni, 3.3 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st
%Cpu1 : 95.0 us, 1.7 sy, 0.0 ni, 3.4 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu2 : 98.3 us, 0.3 sy, 0.0 ni, 1.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu3 : 87.3 us, 4.3 sy, 0.0 ni, 8.4 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu4 : 95.0 us, 1.3 sy, 0.0 ni, 3.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu5 : 98.3 us, 0.0 sy, 0.0 ni, 1.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu6 : 96.7 us, 1.3 sy, 0.0 ni, 2.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu7 : 92.7 us, 1.0 sy, 0.0 ni, 5.6 id, 0.3 wa, 0.0 hi, 0.3 si, 0.0 st
%Cpu8 : 93.7 us, 1.3 sy, 0.0 ni, 5.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu9 : 92.3 us, 0.7 sy, 0.0 ni, 7.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu10 : 97.3 us, 0.7 sy, 0.0 ni, 2.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu11 : 97.3 us, 0.7 sy, 0.0 ni, 2.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu12 : 92.0 us, 3.0 sy, 0.0 ni, 5.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu13 : 94.9 us, 1.0 sy, 0.0 ni, 4.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu14 : 88.3 us, 3.0 sy, 0.0 ni, 8.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu15 : 92.6 us, 2.3 sy, 0.0 ni, 4.7 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st
%Cpu16 : 94.7 us, 2.3 sy, 0.0 ni, 2.6 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st
%Cpu17 : 93.0 us, 0.7 sy, 0.0 ni, 6.0 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st
%Cpu18 : 93.0 us, 3.7 sy, 0.0 ni, 3.3 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu19 : 91.2 us, 0.7 sy, 0.0 ni, 8.1 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
At first I was wondering if the NVMe driver was causing too much overhead but disk activity was nil when I captured the above output from top. Without putting together a flame graph I suspect this is just the CPU pulling data from memory and aggregating it.
The memory usage on the system supports this.
top - 08:56:57 up 1:53, 4 users, load average: 21.15, 9.72, 8.75
Tasks: 360 total, 1 running, 359 sleeping, 0 stopped, 0 zombie
%Cpu(s): 92.9 us, 1.5 sy, 0.0 ni, 5.5 id, 0.0 wa, 0.0 hi, 0.1 si, 0.0 st
KiB Mem : 25175468+total, 40119300 free, 68494352 used, 14314105+buff/cache
KiB Swap: 0 total, 0 free, 0 used. 18175958+avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
98118 root 20 0 0.138t 0.047t 33032 S 2976 20.2 176:49.70 /usr/bin/java -+
19797 root 20 0 32.913g 390216 25668 S 34.9 0.2 9:42.22 /usr/bin/java -+
Process 98118 is the Spark Master and 19797 is the HDFS Datanode.
When this benchmark was finished I shut down Spark's services in order to give Presto as many resources as possible.
$ sudo /opt/spark/sbin/stop-master.sh
$ sudo /opt/spark/sbin/stop-slaves.sh
Presto Benchmark Results
Below are the fastest times I saw running the following queries using Presto.
$ presto \
--server localhost:8080 \
--catalog hive \
--schema default
The following completed in 11 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 14 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 16 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 22 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Launching an AWS EMR Cluster
As of version 5.12.0 AWS EMR doesn't yet support Hadoop 3.0.0 nor Spark 2.3.0. Using a replication factor of one I haven't seen significant performance differences, albeit on a single node, between running Spark 2.2.1 versus 2.3.0 but I have yet to run HDFS benchmarks between Hadoop 2.8.x versus 3.0.0.
The Presto (0.188) and Hive (2.3.2) versions match the single-instance setup.
It's also important to note that I didn't come across any EC2 instance types supported by EMR that came with NVMe drives. Regular SSDs were the fastest storage I could find.
The following will launch an EMR cluster with a single master node, ten data nodes and ten task nodes. With the exception of the master node, all nodes will be spot instances to keep the cost down.
$ aws emr create-cluster \
--applications \
Name=Hadoop \
Name=Hive \
Name=Presto \
Name=Spark \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--ebs-root-volume-size 10 \
--ec2-attributes '{
"KeyName": "emr",
"InstanceProfile": "EMR_EC2_DefaultRole",
"SubnetId": "subnet-6a55a532",
"EmrManagedSlaveSecurityGroup": "sg-9f2607f9",
"EmrManagedMasterSecurityGroup": "sg-902607f6"
}' \
--enable-debugging \
--log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
--instance-groups '[{
"InstanceCount": 10,
"BidPrice": "1",
"InstanceGroupType": "CORE",
"InstanceType": "m3.xlarge",
"Name": "Core - 2"
}, {
"InstanceCount": 1,
"InstanceGroupType": "MASTER",
"InstanceType": "m3.xlarge",
"Name": "Master - 1"
}, {
"InstanceCount": 10,
"BidPrice": "1",
"InstanceGroupType": "TASK",
"InstanceType": "m3.xlarge",
"Name": "Task - 3"
}]' \
--name 'My cluster' \
--region eu-west-1 \
--release-label emr-5.12.0 \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--service-role EMR_DefaultRole \
--termination-protected
With the cluster provisioned and bootstrapped I was able to SSH in.
$ ssh -i ~/.ssh/emr.pem \
hadoop@ec2-54-194-43-34.eu-west-1.compute.amazonaws.com
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2017.09-release-notes/
4 package(s) needed for security, out of 9 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
The first thing I checked was the HDFS cluster's capacity. The replication factor will be set to three so 240 GB across the 689 GB available will be used for this dataset.
$ hdfs dfsadmin -report \
| grep 'Configured Capacity'
Configured Capacity: 740336517120 (689.49 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
I then created a folder on HDFS for the taxi trip dataset.
$ hdfs dfs -mkdir /trips_orc/
The primary drive on this system only had around 4.7 GB of capacity available and I needed to use it to transfer data from S3 to HDFS.
$ df -H
Filesystem Size Used Avail Use% Mounted on
devtmpfs 7.8G 29k 7.8G 1% /dev
tmpfs 7.8G 0 7.8G 0% /dev/shm
/dev/xvda1 11G 4.7G 5.8G 45% /
/dev/xvdb1 5.4G 35M 5.4G 1% /emr
/dev/xvdb2 35G 348M 35G 1% /mnt
/dev/xvdc 41G 35M 41G 1% /mnt1
I ended up using this script that downloaded the 56 ORC files one at a time and deleted them from the primary drive once they were on HDFS.
$ PREFIX='s3://<bucket>/orc/'
$ ZEROS='0000'
$ SUFFIX='_0'
$ for i in {00..55}; do
aws s3 cp \
$PREFIX$ZEROS$i$SUFFIX \
./
hdfs dfs -copyFromLocal \
$ZEROS$i$SUFFIX \
/trips_orc/
rm $ZEROS$i$SUFFIX
done
Creating Tables in Hive on AWS EMR
Below I created the table that points to the ORC data stored across the HDFS cluster. This SQL matches the SQL used to create the single-node table.
$ hive
CREATE EXTERNAL TABLE trips_orc (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_orc/';
Spark SQL on EMR Benchmark Results
The following were the fastest times I saw after running each query multiple times.
$ spark-sql
The following completed in 28 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 31 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 33 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 80 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Parquet is a more performant format for Spark but I running those benchmarks didn't fit in the time box for this blog. I'll revisit Spark and Parquet once AWS EMR updates Spark to 2.3.0. ORC performance is supposed to be 2-5x faster with that release as well.
Presto on EMR Benchmark Results
The following were the fastest times I saw after running each query multiple times.
$ presto-cli \
--catalog hive \
--schema default
The following completed in 4.88 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 11 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 12 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 15 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Closing Thoughts
Below is a recap of the fastest times seen in the benchmarks above sorted by query 1.
Query 1 | Query 2 | Query 3 | Query 4 | Setup |
---|---|---|---|---|
4.88 | 11 | 12 | 15 | EMR 5.12.0 & Presto 0.188 |
11 | 14 | 16 | 22 | i3.8xlarge & Presto 0.188 |
22 | 25 | 27 | 65 | i3.8xlarge & Spark 2.3.0 |
28 | 31 | 33 | 80 | EMR 5.12.0 & Spark 2.2.1 |
The increased RAM bandwidth and compute power of the cluster did a lot to speed up Presto's query 1 performance. Though costs between the two solutions are close the EMR cluster has the added benefit of redundancy.
It's worth noting EMR's setup time. It can take up to 25 minutes for a cluster to become fully functional. Had I used an AMI image for the single-node instance I would have been up and running from the moment I could SSH in. If you provision daily and spend four hours working in Spark between meetings this is an eighth of your time waiting for infrastructure.
That being said what EMR is doing during bootstrapping is the result of years of fixing issues and making sure it's a rock-solid setup. The installation notes I put together haven't been put through their paces by Amazon's EMR customer base over several years and even one or two issues in the EC2 setup could cost a few man days of productivity for analysts and platform engineers.
As I wrote at the beginning of this blog post I wanted to compare the two methods based on performance and the winner is the EMR Cluster.