Last year I did a $3 / hour challenge between a beefy EC2 instance and a 21-node EMR cluster. In that benchmark the cluster ended up being more performant. But the version of Spark used in that benchmark hadn't come with various ORC file reader improvements that I knew were in the pipeline for Spark 2.4. In September Spark 2.4.0 was finally released and last month AWS EMR added support for it.
In this benchmark I'll take a look at how well Spark has come along in terms of performance against the latest version of Presto supported on EMR. I'll also be looking at file format performance with both Parquet and ORC-formatted datasets.
The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together.
This is the same dataset I've used to benchmark Amazon Athena, BigQuery, BrytlytDB, ClickHouse, Elasticsearch, kdb+/q, MapD (now known as OmniSci), PostgreSQL, Redshift and Vertica. I have a single-page summary of all these benchmarks for comparison.
AWS EMR Up & Running
The following will launch an EMR cluster with a single master node and 20 core nodes. The cluster runs version 2.8.5 of Amazon's Hadoop distribution, Hive 2.3.4, Presto 0.214 and Spark 2.4.0. All nodes are spot instances to keep the cost down. The final price I paid for all 21 machines was $1.55 / hour including the cost of the 400 GB EBS volume on the master node.
$ aws emr create-cluster
--applications \
Name=Hadoop \
Name=Hive \
Name=Presto \
Name=Spark \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--ebs-root-volume-size 10 \
--ec2-attributes '{
"KeyName": "emr",
"InstanceProfile": "EMR_EC2_DefaultRole",
"EmrManagedSlaveSecurityGroup": "sg-...",
"EmrManagedMasterSecurityGroup": "sg-..."
}' \
--enable-debugging \
--instance-groups '[
{
"Name": "Core - 2",
"InstanceCount": 20,
"BidPrice": "OnDemandPrice",
"InstanceType": "m3.xlarge",
"InstanceGroupType": "CORE"
},
{
"InstanceCount": 1,
"Name": "Master - 1",
"InstanceGroupType": "MASTER",
"EbsConfiguration": {
"EbsOptimized": false,
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"VolumeType": "standard",
"SizeInGB": 400
},
"VolumesPerInstance": 1
}
]
},
"BidPrice": "OnDemandPrice",
"InstanceType": "m3.xlarge"
}
]' \
--log-uri 's3n://aws-logs-...-eu-west-1/elasticmapreduce/' \
--name 'My cluster' \
--region eu-west-1 \
--release-label emr-5.20.0 \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--service-role EMR_DefaultRole \
--termination-protected
With the cluster provisioned and bootstrapped I was able to SSH in.
$ ssh -i ~/.ssh/emr.pem \
hadoop@54.78.224.209
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
3 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
I'll be using the same ORC and Parquet files I used in last year's benchmark. They're stored on AWS S3 so I'll configure the aws CLI with my access and secret keys and retrieve them.
$ aws configure
I'll then set the client's concurrent requests limit to 100 so the files download quicker than they would with stock settings.
$ aws configure set \
default.s3.max_concurrent_requests \
100
I've requested a 400 GB EBS storage volume on the master node so that I can download the two datasets before loading them onto HDFS.
$ df -h
Filesystem Size Used Avail Use% Mounted on
devtmpfs 7.4G 80K 7.4G 1% /dev
tmpfs 7.4G 0 7.4G 0% /dev/shm
/dev/xvda1 9.8G 4.6G 5.1G 48% /
/dev/xvdb1 5.0G 33M 5.0G 1% /emr
/dev/xvdb2 33G 289M 33G 1% /mnt
/dev/xvdc 38G 33M 38G 1% /mnt1
/dev/xvdd 400G 33M 400G 1% /mnt2
I ran the following to pull the datasets off of S3.
$ cd /mnt2
$ aws s3 sync s3://<bucket>/orc orc
$ aws s3 sync s3://<bucket>/parquet parquet
Each dataset is around 100 GB. No single node on HDFS is large enough to store them (let alone with 3x replication) but when I load them onto HDFS they'll be spread across the whole cluster taking up around ~600 GB of total capacity after replication.
Below you can see the cluster has 1.35 TB of capacity with each node contributing 68.95 GB towards this.
$ hdfs dfsadmin -report \
| grep 'Configured Capacity'
Configured Capacity: 1480673034240 (1.35 TB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
The following will create folders on HDFS for each dataset format as well as copy the taxi trip files into those folders.
$ hdfs dfs -mkdir /trips_orc/
$ hdfs dfs -copyFromLocal /mnt2/orc/* /trips_orc/
$ hdfs dfs -mkdir /trips_parquet/
$ hdfs dfs -copyFromLocal /mnt2/parquet/* /trips_parquet/
After the data was loaded onto HDFS I can see it has been distributed somewhat evenly across the cluster.
$ hdfs dfsadmin -report \
| grep 'DFS Used%' \
| tail -n +2 \
| sort
DFS Used%: 38.97%
DFS Used%: 39.61%
DFS Used%: 39.75%
DFS Used%: 39.84%
DFS Used%: 40.02%
DFS Used%: 40.17%
DFS Used%: 40.45%
DFS Used%: 40.78%
DFS Used%: 40.92%
DFS Used%: 41.08%
DFS Used%: 41.18%
DFS Used%: 41.62%
DFS Used%: 41.82%
DFS Used%: 42.21%
DFS Used%: 42.38%
DFS Used%: 42.51%
DFS Used%: 42.67%
DFS Used%: 42.79%
DFS Used%: 44.35%
DFS Used%: 44.98%
You might wonder why I haven't used erasure coding as it only needs half the space of 3x replication. Erasure coding is a feature only found in Hadoop 3 and AWS EMR doesn't support Hadoop 3 yet.
Below is a spot check to make sure one of the files has been replicated 3 times without issue.
$ hadoop fsck /trips_parquet/000000_0 \
-files \
-blocks \
-racks
Status: HEALTHY
Total size: 2079571240 B
Total dirs: 0
Total files: 1
Total symlinks: 0
Total blocks (validated): 16 (avg. block size 129973202 B)
Minimally replicated blocks: 16 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 3
Average block replication: 3.0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)
Number of data-nodes: 20
Number of racks: 1
Setting Up Hive Tables
I'll launch Hive and create schemas for both datasets. This will allow both Spark and Presto to see the datasets as tables they can run SQL against.
$ hive
CREATE EXTERNAL TABLE trips_parquet (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS parquet
LOCATION '/trips_parquet/';
CREATE EXTERNAL TABLE trips_orc (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_orc/';
Spark SQL on EMR Benchmark Results
The following were the fastest times I saw after running each query multiple times.
$ spark-sql
These four queries were run on the ORC-formatted dataset.
The following completed in 2.362 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 3.559 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 4.019 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 20.412 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
These four queries were run on the Parquet-formatted dataset.
The following completed in 3.401 seconds.
SELECT cab_type,
count(*)
FROM trips_parquet
GROUP BY cab_type;
The following completed in 3.419 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;
The following completed in 5.441 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 21.307 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Presto on EMR Benchmark Results
The following were the fastest times I saw after running each query multiple times.
$ presto-cli \
--catalog hive \
--schema default
These four queries were run on the ORC-formatted dataset.
The following completed in 3.54 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 6.29 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 7.66 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 11.92 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
These four queries were run on the Parquet-formatted dataset.
The following completed in 7.33 seconds.
SELECT cab_type,
count(*)
FROM trips_parquet
GROUP BY cab_type;
The following completed in 7.17 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;
The following completed in 10.7 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 11.02 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Thoughts on the Results
Below is a recap of this and last year's benchmarks. All measurements are in seconds.
Q1 | Q2 | Q3 | Q4 | Application Version & Format |
---|---|---|---|---|
2.362 | 3.559 | 4.019 | 20.412 | Spark 2.4.0 & ORC |
3.401 | 3.419 | 5.441 | 21.307 | Spark 2.4.0 & Parquet |
3.54 | 6.29 | 7.66 | 11.92 | Presto 0.214 & ORC |
7.33 | 7.17 | 10.7 | 11.02 | Presto 0.214 & Parquet |
28 | 31 | 33 | 80 | Spark 2.2.1 & ORC |
n/a | n/a | n/a | n/a | Spark 2.2.1 & Parquet |
4.88 | 11 | 12 | 15 | Presto 0.188 & ORC |
n/a | n/a | n/a | n/a | Presto 0.188 & Parquet |
Spark executed Query 1 1.5x faster than Presto. This while using ORC-formatted data which has historically been Presto's most performant format and where its performance edge over Spark was found.
The ORC-based queries outperformed the Parquet ones for both Spark and Presto. All the optimisation work the Apache Spark team has put into their ORC support has tipped the scales against Parquet.
Presto's Parquet performance was almost twice as slow as Spark for Query 3. I suspect there could be a lot of performance found if more engineering time were put into the Parquet reader code for Presto.
Presto still handles large result sets faster than Spark. It's almost twice as fast on Query 4 irrespective of file format.
When I did this benchmark last year on the same sized 21-node EMR cluster Spark 2.2.1 was 12x slower on Query 1 using ORC-formatted data. That is a huge amount of performance to find in the space of a year.
Presto has made performance gains since version 0.188 as well albeit only a 1.37x speed up on Query 1. To be fair, Presto has always been very quick with ORC data so I'm not expecting to see orders-of-magnitude improvements.