Every analytical database I've used converts imported data into a form that is quicker to read. Often this means storing data in column form instead of row form. The taxi trip dataset I benchmark with is around 100 GB when gzip-compressed compressed in row form but the five columns that are queried can be stored in around 3.5 GB of space in columnar form when compressed using a mixture of dictionary encoding, run-length encoding and Snappy compression.
The process of converting rows into columns is time consuming and compute-intensive. Most systems can take the better part of an hour to finish this conversion, even when using a cluster of machines. I once believed that compression was causing most of the overhead but in researching this post I found Spark 2.4.0 had a ~7% difference in conversion time between using Snappy, zlib, lzo and not using any compression at all.
I've historically used Hive for this conversion process but there are ways to mix and match different Hadoop tools, including Spark and Presto, to get the same outcome and often with very different processing times.
Spark, Hive and Presto are all very different code bases. Spark is made up of 500K lines of Scala, 110K lines of Java and 40K lines of Python. Presto is made up of 600K lines of Java. Hive is made up of over one million lines of Java and 100K lines of C++ code. Any libraries they share are out-weighted by the unique approaches they've taken in the architecture surrounding their SQL parsers, query planners, optimizers, code generators and execution engines when it comes to tabular form conversion.
I recently benchmarked Spark 2.4.0 and Presto 0.214 and found that Spark out-performed Presto when it comes to ORC-based queries. In this post I'm going to examine the ORC writing performance of these two engines plus Hive and see which can convert CSV files into ORC files the fastest.
AWS EMR Up & Running
The following will launch an EMR cluster with a single master node and 20 core nodes. The cluster runs version 2.8.5 of Amazon's Hadoop distribution, Hive 2.3.4, Presto 0.214 and Spark 2.4.0. All nodes are spot instances to keep the cost down.
$ aws emr create-cluster
--applications \
Name=Hadoop \
Name=Hive \
Name=Presto \
Name=Spark \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--ebs-root-volume-size 10 \
--ec2-attributes '{
"KeyName": "emr",
"InstanceProfile": "EMR_EC2_DefaultRole",
"EmrManagedSlaveSecurityGroup": "sg-...",
"EmrManagedMasterSecurityGroup": "sg-..."
}' \
--enable-debugging \
--instance-groups '[
{
"Name": "Core - 2",
"InstanceCount": 20,
"BidPrice": "OnDemandPrice",
"InstanceType": "m3.xlarge",
"InstanceGroupType": "CORE"
},
{
"InstanceCount": 1,
"Name": "Master - 1",
"InstanceGroupType": "MASTER",
"EbsConfiguration": {
"EbsOptimized": false,
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"VolumeType": "standard",
"SizeInGB": 400
},
"VolumesPerInstance": 1
}
]
},
"BidPrice": "OnDemandPrice",
"InstanceType": "m3.xlarge"
}
]' \
--log-uri 's3n://aws-logs-...-eu-west-1/elasticmapreduce/' \
--name 'My cluster' \
--region eu-west-1 \
--release-label emr-5.20.0 \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--service-role EMR_DefaultRole \
--termination-protected
With the cluster provisioned and bootstrapped I was able to SSH in.
$ ssh -i ~/.ssh/emr.pem \
hadoop@54.155.139.6
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
3 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
The CSV dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. They're stored on AWS S3 so I'll configure the aws CLI with my access and secret keys and retrieve them.
$ aws configure
I'll then set the client's concurrent requests limit to 100 so the files download quicker than they would with stock settings.
$ aws configure set \
default.s3.max_concurrent_requests \
100
I've requested an EBS storage volume on the master node so that I can download the dataset before loading it onto HDFS.
$ df -h
Filesystem Size Used Avail Use% Mounted on
devtmpfs 7.4G 80K 7.4G 1% /dev
tmpfs 7.4G 0 7.4G 0% /dev/shm
/dev/xvda1 9.8G 4.6G 5.1G 48% /
/dev/xvdb1 5.0G 33M 5.0G 1% /emr
/dev/xvdb2 33G 289M 33G 1% /mnt
/dev/xvdc 38G 33M 38G 1% /mnt1
/dev/xvdd 400G 33M 400G 1% /mnt2
I ran the following to pull the dataset off of S3.
$ mkdir -p /mnt2/csv/
$ aws s3 sync s3://<bucket>/csv/ /mnt2/csv/
I then ran the following to push the data onto HDFS.
$ hdfs dfs -mkdir /trips_csv/
$ hdfs dfs -copyFromLocal /mnt2/csv/*.csv.gz /trips_csv/
Converting CSVs to ORC using Hive
I'll use Hive to create a schema catalogue for the various datasets that will be produced in this benchmark. The following will create the table for the CSV-formatted dataset.
$ hive
CREATE TABLE trips_csv (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(6,3),
fare_amount DECIMAL(6,2),
extra DECIMAL(6,2),
mta_tax DECIMAL(6,2),
tip_amount DECIMAL(6,2),
tolls_amount DECIMAL(6,2),
ehail_fee DECIMAL(6,2),
improvement_surcharge DECIMAL(6,2),
total_amount DECIMAL(6,2),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/trips_csv/';
I'll create a table that will store the dataset as an ORC-formatted, Snappy-compressed dataset that will be produced by Hive.
CREATE TABLE trips_orc_snappy_hive (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_orc_snappy_hive/'
TBLPROPERTIES ("orc.compress"="snappy");
Below I'll convert the CSV dataset into ORC using Hive alone. The following took 55 mins and 5 seconds.
INSERT INTO trips_orc_snappy_hive
SELECT * FROM trips_csv;
Converting CSVs to ORC using Spark
I'll create a table for Spark to store its ORC-formatted, Snappy-compressed data.
$ hive
CREATE TABLE trips_orc_snappy_spark (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_orc_snappy_spark/'
TBLPROPERTIES ("orc.compress"="snappy");
I'll then launch Spark and convert the CSV data into ORC using its engine.
$ spark-sql
The following took 1 hour, 43 mins and 7 seconds.
INSERT INTO TABLE trips_orc_snappy_spark
SELECT * FROM trips_csv;
To show that Parquet isn't the more optimised format I'll create a table to store Snappy-compressed data in Parquet format and run the same CSV conversion.
CREATE TABLE trips_parquet_snappy_spark (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS parquet
LOCATION '/trips_parquet_snappy_spark/'
TBLPROPERTIES ("parquet.compress"="snappy");
The following took 1 hour, 56 minutes and 29 seconds.
INSERT INTO TABLE trips_parquet_snappy_spark
SELECT * FROM trips_csv;
The HDFS cluster only has 1.35 TB of capacity and 3x replication is being used so I'll clear out these datasets before continuing.
$ hdfs dfs -rm -r -skipTrash /trips_orc_snappy_hive/
$ hdfs dfs -rm -r -skipTrash /trips_orc_snappy_spark/
$ hdfs dfs -rm -r -skipTrash /trips_parquet_snappy_spark/
Converting CSVs to ORC using Presto
Below I'll create a table for Presto to store a Snappy-compressed ORC dataset.
$ hive
CREATE TABLE trips_orc_snappy_presto (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_orc_snappy_presto/'
TBLPROPERTIES ("orc.compress"="snappy");
I'll run the CSV to ORC conversion in Presto's CLI.
$ presto-cli \
--schema default \
--catalog hive
The following took 37 mins and 35 seconds.
INSERT INTO trips_orc_snappy_presto
SELECT * FROM trips_csv;
The above generated a dataset 118.6 GB in size (excluding HDFS replication).
$ hdfs dfs -du -s -h /trips_orc_snappy_presto/
Facebook have been working on implementing ZStandard, a lossless data compression algorithm, and integrating it into various tools in the Hadoop ecosystem. Spark 2.4.0 on EMR with stock settings isn't able to read this compression scheme but Presto can both read and write it.
I'll create a ZStandard-compressed table for Presto below using Hive.
$ hive
CREATE TABLE trips_orc_zstd_presto (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/trips_orc_zstd_presto/'
TBLPROPERTIES ("orc.compress"="zstd");
I'll then use Presto to convert the CSV data into ZStandard-compressed ORC files.
$ presto-cli \
--schema default \
--catalog hive
The following took 37 mins and 44 seconds.
INSERT INTO trips_orc_zstd_presto
SELECT * FROM trips_csv;
The above generated 79 GB of data (excluding HDFS replication).
$ hdfs dfs -du -s -h /trips_orc_zstd_presto/
Presto ORC Benchmark: Snappy versus ZStandard
ZStandard did a good job to save space on HDFS and still converted the data in a very short amount of time. Below I'll look at the impact of the two compression schemes on query performance.
The following were the fastest times I saw after running each query multiple times.
$ presto-cli \
--schema default \
--catalog hive
These four queries were run on the Snappy-compressed, ORC-formatted dataset.
The following completed in 5.48 seconds.
SELECT cab_type,
count(*)
FROM trips_orc_snappy_presto
GROUP BY cab_type;
The following completed in 7.85 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc_snappy_presto
GROUP BY passenger_count;
The following completed in 8.55 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc_snappy_presto
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 11.92 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc_snappy_presto
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
These four queries were run on the ZStandard-compressed, ORC-formatted dataset.
The following completed in 4.21 seconds.
SELECT cab_type,
count(*)
FROM trips_orc_zstd_presto
GROUP BY cab_type;
The following completed in 5.97 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_orc_zstd_presto
GROUP BY passenger_count;
The following completed in 7.3 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_orc_zstd_presto
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 11.68 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_orc_zstd_presto
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Thoughts on the Results
Hive being twice as fast as Spark at converting CSVs to ORC files took me by surprise as Spark has a younger code base. That being said, Presto being 1.5x faster as Hive was another shocker. I'm hoping in publishing this post that the community are made more aware of these performance differences and we can find improvements in future releases of all three packages.
Hive being a central store of catalogue data for so many tools in the Hadoop ecosystem cannot be ignored and I expect I will continue to use it for a long time to come. Where I'm put into an awkward position is in recommending that converting CSVs into ORCs should be done in Presto for significant time savings but querying that data afterwards is quicker in Spark. There really is no one tool that rules them all.
ZStandard offers a lot in terms of disk space savings while not impacting query performance significantly. To take the raw data and make it 1.5x smaller while not impacting conversion time is fantastic. When Hadoop 3's Erasure Coding is mixed into the equation it'll bring space requirements down by 2/3rds on HDFS. Being able to buy a petabyte of storage capacity and storing three times as much data on it is huge.
I feel that my research into converting CSVs into ORC files is just getting started. Beyond fundamental architectural differences between these three pieces of software I suspect stock settings on EMR could be improved to provide faster conversion times. This isn't something I can prove at this point in time and this will be a subject of further research.
Spark did receive support for ZStandard in 2.3.0 so I suspect it's nothing more than a configuration change to add support into Spark on EMR. I'm going to keep an eye on future releases of AWS EMR to see if this is added to the stock settings.