Last March I benchmarked Spark 1.6 against the 1.1 billion taxi rides dataset on a 5-node, m3.xlarge EMR 4.3.0 cluster using S3 for storage. The results were some of the slowest I've seen. Though OLAP workloads aren't Spark's key selling point the lack of good data locality didn't help and it's something I wanted to revisit.
In this blog post I'll benchmark the same dataset using an 11-node, m3.xlarge EMR 5.3.0 cluster running Spark 2.1.0. I'll be storing the data in both Parquet and ORC format on HDFS which will sit on SSD drives.
AWS EMR 4.3.0 shipped with Spark 1.6.0. Since then, EMR 5.3.0 has been released which comes with Spark 2.1.0. Between versions 1.6.0 and 2.1.0 of Spark there have been over 1,200 improvements alone in the code base. Things like changing the default compression to Snappy, using the vectorized parquet reader by default and performance improvements in reading dictionary encoding are just a couple of the many performance improvements seen over the past 10 months.
Spark usually performed better with Parquet files over ORC files whereas with Presto is usually the opposite. I want to see if this is still the case in Spark 2.1.0 and Presto 0.157.1.
The dataset being used is the same one I've used to benchmark Amazon Athena, BigQuery, Elasticsearch, kdb+/q, MapD, PostgreSQL, Presto, Redshift as well as the previous Spark 1.6 benchmark. I've compiled a single-page summary of these benchmarks.
AWS CLI Up and Running
All the following commands were run on a fresh install of Ubuntu 14.04.3.
To start, I'll install the AWS CLI tool and a few dependencies it needs to run.
$ sudo apt update
$ sudo apt install \
python-pip \
python-virtualenv
$ virtualenv amazon
$ source amazon/bin/activate
$ pip install awscli
I'll then enter my AWS credentials.
$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY
I'll run configure to make sure eu-west-1 is my default region.
$ aws configure
AWS Access Key ID [********************]:
AWS Secret Access Key [********************]:
Default region name [eu-west-1]: eu-west-1
Default output format [None]:
The following will allow for 100 concurrent requests to run when interacting with S3 and does a good job of saturating my Internet connection.
$ aws configure set \
default.s3.max_concurrent_requests \
100
I've created an S3 bucket in the US Standard region called "taxis2". The following will sync the 56 compressed CSV files on my system with that bucket into a folder called "csv".
$ aws s3 sync 20M-blocks/ \
s3://taxis2/csv/
Launching an EMR Cluster
I'll be launching an 11-node cluster of m3.xlarge instances using the 5.3.0 release of AWS EMR. This comes with Hadoop 2.7.3, Hive 2.1.1, Spark 2.1.0 and Presto 0.157.1.
There will be one master node using an on-demand instance costing $0.293 / hour plus five core and five task nodes all using spot instances that will cost at most $0.05 / hour each.
$ aws emr create-cluster \
--applications \
Name=Hadoop \
Name=Hive \
Name=Spark \
Name=Presto \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--ec2-attributes '{
"KeyName": "emr",
"InstanceProfile": "EMR_EC2_DefaultRole",
"SubnetId": "subnet-6a55a532",
"EmrManagedSlaveSecurityGroup": "sg-9f2607f9",
"EmrManagedMasterSecurityGroup": "sg-902607f6"}' \
--enable-debugging \
--instance-groups '[{
"InstanceCount": 1,
"InstanceGroupType": "MASTER",
"InstanceType": "m3.xlarge",
"Name": "Master - 1"
},{
"InstanceCount": 5,
"BidPrice": "0.05",
"InstanceGroupType": "TASK",
"InstanceType": "m3.xlarge",
"Name": "Task - 3"
},{
"InstanceCount": 5,
"BidPrice": "0.05",
"InstanceGroupType": "CORE",
"InstanceType": "m3.xlarge",
"Name": "Core - 2"
}]' \
--log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
--name 'My cluster' \
--region eu-west-1 \
--release-label emr-5.3.0 \
--scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
--service-role EMR_DefaultRole \
--termination-protected
After 15 minutes the machines had all been provisioned, bootstrapped and I was able to SSH into the master node.
$ ssh -o ServerAliveInterval=50 \
-i ~/.ssh/emr.pem \
hadoop@54.194.196.223
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/
7 package(s) needed for security, out of 7 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
HDFS has already been setup by EMR with 344.75 GB of capacity which is all SSD-backed.
$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 369031170158 (343.69 GB)
DFS Remaining: 368997994496 (343.66 GB)
DFS Used: 33175662 (31.64 MB)
DFS Used%: 0.01%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
1.1B Trips in Parquet Format
The gzip-compressed CSV files are already up on S3 so I will create a table in Hive pointing to them.
$ screen
$ hive
CREATE EXTERNAL TABLE trips_csv (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(6,3),
fare_amount DECIMAL(6,2),
extra DECIMAL(6,2),
mta_tax DECIMAL(6,2),
tip_amount DECIMAL(6,2),
tolls_amount DECIMAL(6,2),
ehail_fee DECIMAL(6,2),
improvement_surcharge DECIMAL(6,2),
total_amount DECIMAL(6,2),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://taxis2/csv/';
I'll then create a second table called "trips_parquet" that will store data in Parquet format using Snappy compression. Hive will store this table on HDFS in a folder called /user/hive/warehouse/trips_parquet.
CREATE EXTERNAL TABLE trips_parquet (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS parquet
TBLPROPERTIES ("parquet.compression"="SNAPPY");
The following will copy and convert the CSV data into Parquet format.
INSERT INTO trips_parquet
SELECT * FROM trips_csv;
The above completed in 1 hour, 39 minutes and 6 seconds.
Query ID = hadoop_20170129101621_bd19132e-5e10-4755-aebd-46ab7dcab946
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1485684315400_0001)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 56 56 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 5942.56 s
----------------------------------------------------------------------------------------------
Loading data to table default.trips_parquet
OK
Time taken: 5946.652 seconds
62.27% of HDFS' capacity was taken up by the Parquet-formatted data (around 210 GB).
$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 368530566198 (343.22 GB)
DFS Remaining: 142720930206 (132.92 GB)
DFS Used: 225809635992 (210.30 GB)
DFS Used%: 61.27%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
The conversion process created a Parquet counterpart for each CSV file. The gzip-compressed CSV files are around 1.8 GB each. The Snappy-compressed Parquet files are just slightly larger in size.
$ hdfs dfs -ls -R /user/hive/warehouse/trips_parquet/
... 2079571214 2017-01-29 11:55 .../000000_0
... 2053401813 2017-01-29 11:53 .../000001_0
... 2048854301 2017-01-29 11:52 .../000002_0
... 2052561364 2017-01-29 11:53 .../000003_0
... 2050481346 2017-01-29 11:54 .../000004_0
... 2065505270 2017-01-29 11:54 .../000005_0
... 2066827529 2017-01-29 11:54 .../000006_0
... 2067046393 2017-01-29 11:55 .../000007_0
... 2067024340 2017-01-29 11:04 .../000008_0
... 2064177211 2017-01-29 11:15 .../000009_0
... 2047959045 2017-01-29 11:14 .../000010_0
... 2065216794 2017-01-29 10:55 .../000011_0
... 2060736800 2017-01-29 10:57 .../000012_0
... 2053307829 2017-01-29 11:03 .../000013_0
... 2059428750 2017-01-29 11:05 .../000014_0
... 2050781990 2017-01-29 11:50 .../000015_0
... 2049048934 2017-01-29 10:55 .../000016_0
... 2056438442 2017-01-29 11:04 .../000017_0
... 2058827623 2017-01-29 11:40 .../000018_0
... 2048821684 2017-01-29 11:49 .../000019_0
... 2063570750 2017-01-29 11:38 .../000020_0
... 2046578172 2017-01-29 11:39 .../000021_0
... 2042163746 2017-01-29 11:04 .../000022_0
... 2049370077 2017-01-29 11:50 .../000023_0
... 1859978547 2017-01-29 11:40 .../000024_0
... 2068601824 2017-01-29 11:50 .../000025_0
... 2044455767 2017-01-29 11:24 .../000026_0
... 2041972949 2017-01-29 11:39 .../000027_0
... 2042778883 2017-01-29 11:23 .../000028_0
... 2061181140 2017-01-29 11:04 .../000029_0
... 2046059808 2017-01-29 11:40 .../000030_0
... 2074502699 2017-01-29 11:50 .../000031_0
... 2046854667 2017-01-29 11:39 .../000032_0
... 2062188950 2017-01-29 11:03 .../000033_0
... 2056792317 2017-01-29 11:23 .../000034_0
... 2057704636 2017-01-29 11:40 .../000035_0
... 2064976471 2017-01-29 11:38 .../000036_0
... 2049477312 2017-01-29 11:50 .../000037_0
... 2053522738 2017-01-29 11:03 .../000038_0
... 2063258086 2017-01-29 11:39 .../000039_0
... 2062130115 2017-01-29 11:49 .../000040_0
... 1607069101 2017-01-29 11:22 .../000041_0
... 2053130061 2017-01-29 11:40 .../000042_0
... 2049578788 2017-01-29 11:03 .../000043_0
... 2045883456 2017-01-29 11:03 .../000044_0
... 2050745484 2017-01-29 11:03 .../000045_0
... 2048313061 2017-01-29 11:03 .../000046_0
... 2053616037 2017-01-29 11:13 .../000047_0
... 2106530680 2017-01-29 11:50 .../000048_0
... 2053560763 2017-01-29 11:39 .../000049_0
... 2057580280 2017-01-29 11:39 .../000050_0
... 1681822606 2017-01-29 11:21 .../000051_0
... 1530714219 2017-01-29 11:11 .../000052_0
... 1533768620 2017-01-29 11:11 .../000053_0
... 1548224214 2017-01-29 11:34 .../000054_0
... 1397076636 2017-01-29 11:06 .../000055_0
The Parquet Benchmark
Spark reports query times with granularity to the millisecond. Presto will only report times to the second so keep that in mind when analysing the query times.
First up, I'll benchmark the Parquet-formatted data in Spark.
$ spark-sql
The following completed in 10.19 seconds.
SELECT cab_type,
count(*)
FROM trips_parquet
GROUP BY cab_type;
The following completed in 8.134 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;
The following completed in 19.624 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 85.942 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Next, I'll query the same tables on the same cluster using Presto.
$ presto-cli \
--schema default \
--catalog hive
The following completed in 16 seconds.
SELECT cab_type,
count(*)
FROM trips_parquet
GROUP BY cab_type;
The following completed in 22 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;
The following completed in 30 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 37 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
1.1B Trips in ORC Format
There isn't enough HDFS capacity to store both the Parquet and ORC datasets so I'll remove the Parquet table and files first.
$ hdfs dfs -rm -r /user/hive/warehouse/trips_parquet
$ hive
DROP TABLE trips_parquet;
I'll then check that there are a few hundred gigs of free space on HDFS.
$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 367607626838 (342.36 GB)
DFS Remaining: 367171158016 (341.95 GB)
DFS Used: 436468822 (416.25 MB)
DFS Used%: 0.12%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
I'll then launch Hive and create an ORC-formatted table using some compression, stripe and stride settings that have suited this dataset well in the past.
$ hive
CREATE EXTERNAL TABLE trips_orc (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
TBLPROPERTIES ("orc.compress"="SNAPPY",
"orc.stripe.size"="67108864",
"orc.row.index.stride"="50000");
The following will copy and convert the CSV data into ORC format.
INSERT INTO trips_orc
SELECT * FROM trips_csv;
The above completed in 1 hour, 21 minutes and 49 seconds.
Query ID = hadoop_20170129122340_868b5b63-6783-4382-9618-8aa5f8a16cbf
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1485684315400_0003)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 56 56 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 4905.12 s
----------------------------------------------------------------------------------------------
Loading data to table default.trips_orc
OK
Time taken: 4908.814 seconds
Once that completed I could see that 70.91% of HDFS capacity had been eaten up. This dataset converted 1.2x faster than its Parquet counterpart but is 1.15x larger.
$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 367736988391 (342.48 GB)
DFS Remaining: 106990582595 (99.64 GB)
DFS Used: 260746405796 (242.84 GB)
DFS Used%: 70.91%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
Each ORC file is around 4-500MB larger than their gzip-compressed CSV counterpart.
$ hdfs dfs -ls -R /user/hive/warehouse/trips_orc/
... 2373955461 2017-01-29 13:44 .../000000_0
... 2391655597 2017-01-29 13:43 .../000001_0
... 2389090892 2017-01-29 13:42 .../000002_0
... 2386074150 2017-01-29 13:42 .../000003_0
... 2383737570 2017-01-29 13:42 .../000004_0
... 2373355323 2017-01-29 13:44 .../000005_0
... 2374994084 2017-01-29 13:43 .../000006_0
... 2362979370 2017-01-29 13:45 .../000007_0
... 2366033760 2017-01-29 12:54 .../000008_0
... 2364710220 2017-01-29 13:02 .../000009_0
... 2378459534 2017-01-29 13:01 .../000010_0
... 2365652224 2017-01-29 13:08 .../000011_0
... 2350225556 2017-01-29 13:09 .../000012_0
... 2358764639 2017-01-29 13:00 .../000013_0
... 2346091471 2017-01-29 13:02 .../000014_0
... 2358653778 2017-01-29 13:40 .../000015_0
... 2357066399 2017-01-29 13:07 .../000016_0
... 2360479698 2017-01-29 13:40 .../000017_0
... 2380295397 2017-01-29 13:31 .../000018_0
... 2363677667 2017-01-29 13:01 .../000019_0
... 2364109075 2017-01-29 13:31 .../000020_0
... 2366421728 2017-01-29 13:22 .../000021_0
... 2356203058 2017-01-29 13:08 .../000022_0
... 2362475800 2017-01-29 13:40 .../000023_0
... 2088581545 2017-01-29 13:21 .../000024_0
... 2374664877 2017-01-29 13:23 .../000025_0
... 2364000940 2017-01-29 13:40 .../000026_0
... 2360722761 2017-01-29 13:31 .../000027_0
... 2362829388 2017-01-29 13:23 .../000028_0
... 2383698456 2017-01-29 13:10 .../000029_0
... 2358520407 2017-01-29 13:32 .../000030_0
... 2409406129 2017-01-29 13:40 .../000031_0
... 2386976953 2017-01-29 13:21 .../000032_0
... 2355724036 2017-01-29 13:23 .../000033_0
... 2358846741 2017-01-29 13:11 .../000034_0
... 2383852350 2017-01-29 13:31 .../000035_0
... 2395248208 2017-01-29 13:40 .../000036_0
... 2391861144 2017-01-29 13:21 .../000037_0
... 2378207274 2017-01-29 13:11 .../000038_0
... 2393120542 2017-01-29 13:22 .../000039_0
... 2382935318 2017-01-29 13:40 .../000040_0
... 1777333298 2017-01-29 13:31 .../000041_0
... 2386615111 2017-01-29 13:00 .../000042_0
... 2395027003 2017-01-29 13:21 .../000043_0
... 2393883730 2017-01-29 13:22 .../000044_0
... 2381392690 2017-01-29 13:10 .../000045_0
... 2379996162 2017-01-29 12:53 .../000046_0
... 2380507329 2017-01-29 13:01 .../000047_0
... 2381376756 2017-01-29 13:40 .../000048_0
... 2385868026 2017-01-29 13:32 .../000049_0
... 2388616745 2017-01-29 13:21 .../000050_0
... 1879808547 2017-01-29 13:20 .../000051_0
... 1734216308 2017-01-29 13:08 .../000052_0
... 1737411028 2017-01-29 12:59 .../000053_0
... 1724326096 2017-01-29 12:52 .../000054_0
... 1611855003 2017-01-29 12:56 .../000055_0
The ORC Benchmark
I'll launch Spark again and see how fast it can query the ORC-formatted data.
$ spark-sql
The following completed in 46.951 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 54.089 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 56.174 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 131.729 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
I'll then run the same queries in Presto.
$ presto-cli \
--schema default \
--catalog hive
The following completed in 10 seconds.
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 13 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
The following completed in 15 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 21 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Concluding Analysis
The Presto / ORC combination had the fastest cumulative query times but that shouldn't overshadow how far along Spark's OLAP performance has come.
Queries 1, 2 and 3 return 2, 57 and 134 rows respectively and Spark out performed Presto 1.5-2.7x when running these queries with Parquet data. But query 4 returns 4,802 rows and Presto managed to out-perform Spark with both Parquet and ORC-formatted data when returning this long set of results (2.3x and 6.3x faster respectively).
The ORC performance in Spark is still lagging Presto's. Queries were anywhere from 3.7x to 6.3x slower when using this format. That being said, when I looked at Spark 1.6 the ORC queries were ~7.7x slower than their Parquet counterparts; that gap has closed down to 1.5x slower in some cases. I suspect there are a number of ORC-related optimisations Spark could bring in future releases.
It's interesting to see the 242 GB of ORC data could be queried quicker than the 210 GB of Parquet data by Presto. This leads me to believe I/O and networking aren't as big bottlenecks as I would have imagined.
The long file conversion times are something I'd like to find optimisations for. Anyone who has some pointers in invited to get in touch.