Home | Benchmarks | Categories | Atom Feed

Posted on Wed 02 January 2019 under Presto

1.1 Billion Taxi Rides: Spark 2.4.0 versus Presto 0.214

Last year I did a $3 / hour challenge between a beefy EC2 instance and a 21-node EMR cluster. In that benchmark the cluster ended up being more performant. But the version of Spark used in that benchmark hadn't come with various ORC file reader improvements that I knew were in the pipeline for Spark 2.4. In September Spark 2.4.0 was finally released and last month AWS EMR added support for it.

In this benchmark I'll take a look at how well Spark has come along in terms of performance against the latest version of Presto supported on EMR. I'll also be looking at file format performance with both Parquet and ORC-formatted datasets.

The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together.

This is the same dataset I've used to benchmark Amazon Athena, BigQuery, BrytlytDB, ClickHouse, Elasticsearch, kdb+/q, MapD (now known as OmniSci), PostgreSQL, Redshift and Vertica. I have a single-page summary of all these benchmarks for comparison.

AWS EMR Up & Running

The following will launch an EMR cluster with a single master node and 20 core nodes. The cluster runs version 2.8.5 of Amazon's Hadoop distribution, Hive 2.3.4, Presto 0.214 and Spark 2.4.0. All nodes are spot instances to keep the cost down. The final price I paid for all 21 machines was $1.55 / hour including the cost of the 400 GB EBS volume on the master node.

$ aws emr create-cluster
    --applications \
        Name=Hadoop \
        Name=Hive \
        Name=Presto \
        Name=Spark \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --ebs-root-volume-size 10 \
    --ec2-attributes '{
        "KeyName": "emr",
        "InstanceProfile": "EMR_EC2_DefaultRole",
        "EmrManagedSlaveSecurityGroup": "sg-...",
        "EmrManagedMasterSecurityGroup": "sg-..."
    }' \
    --enable-debugging \
    --instance-groups '[
        {
            "Name": "Core - 2",
            "InstanceCount": 20,
            "BidPrice": "OnDemandPrice",
            "InstanceType": "m3.xlarge",
            "InstanceGroupType": "CORE"
        },
        {
            "InstanceCount": 1,
            "Name": "Master - 1",
            "InstanceGroupType": "MASTER",
            "EbsConfiguration": {
                "EbsOptimized": false,
                "EbsBlockDeviceConfigs": [
                    {
                        "VolumeSpecification": {
                            "VolumeType": "standard",
                            "SizeInGB": 400
                        },
                        "VolumesPerInstance": 1
                    }
                ]
            },
            "BidPrice": "OnDemandPrice",
            "InstanceType": "m3.xlarge"
        }
    ]' \
    --log-uri 's3n://aws-logs-...-eu-west-1/elasticmapreduce/' \
    --name 'My cluster' \
    --region eu-west-1 \
    --release-label emr-5.20.0 \
    --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
    --service-role EMR_DefaultRole \
    --termination-protected

With the cluster provisioned and bootstrapped I was able to SSH in.

$ ssh -i ~/.ssh/emr.pem \
    hadoop@54.78.224.209
       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
3 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

I'll be using the same ORC and Parquet files I used in last year's benchmark. They're stored on AWS S3 so I'll configure the aws CLI with my access and secret keys and retrieve them.

$ aws configure

I'll then set the client's concurrent requests limit to 100 so the files download quicker than they would with stock settings.

$ aws configure set \
    default.s3.max_concurrent_requests \
    100

I've requested a 400 GB EBS storage volume on the master node so that I can download the two datasets before loading them onto HDFS.

$ df -h
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs        7.4G   80K  7.4G   1% /dev
tmpfs           7.4G     0  7.4G   0% /dev/shm
/dev/xvda1      9.8G  4.6G  5.1G  48% /
/dev/xvdb1      5.0G   33M  5.0G   1% /emr
/dev/xvdb2       33G  289M   33G   1% /mnt
/dev/xvdc        38G   33M   38G   1% /mnt1
/dev/xvdd       400G   33M  400G   1% /mnt2

I ran the following to pull the datasets off of S3.

$ cd /mnt2
$ aws s3 sync s3://<bucket>/orc orc
$ aws s3 sync s3://<bucket>/parquet parquet

Each dataset is around 100 GB. No single node on HDFS is large enough to store them (let alone with 3x replication) but when I load them onto HDFS they'll be spread across the whole cluster taking up around ~600 GB of total capacity after replication.

Below you can see the cluster has 1.35 TB of capacity with each node contributing 68.95 GB towards this.

$ hdfs dfsadmin -report \
    | grep 'Configured Capacity'
Configured Capacity: 1480673034240 (1.35 TB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)

The following will create folders on HDFS for each dataset format as well as copy the taxi trip files into those folders.

$ hdfs dfs -mkdir /trips_orc/
$ hdfs dfs -copyFromLocal /mnt2/orc/* /trips_orc/

$ hdfs dfs -mkdir /trips_parquet/
$ hdfs dfs -copyFromLocal /mnt2/parquet/* /trips_parquet/

After the data was loaded onto HDFS I can see it has been distributed somewhat evenly across the cluster.

$ hdfs dfsadmin -report \
    | grep 'DFS Used%' \
    | tail -n +2 \
    | sort
DFS Used%: 38.97%
DFS Used%: 39.61%
DFS Used%: 39.75%
DFS Used%: 39.84%
DFS Used%: 40.02%
DFS Used%: 40.17%
DFS Used%: 40.45%
DFS Used%: 40.78%
DFS Used%: 40.92%
DFS Used%: 41.08%
DFS Used%: 41.18%
DFS Used%: 41.62%
DFS Used%: 41.82%
DFS Used%: 42.21%
DFS Used%: 42.38%
DFS Used%: 42.51%
DFS Used%: 42.67%
DFS Used%: 42.79%
DFS Used%: 44.35%
DFS Used%: 44.98%

You might wonder why I haven't used erasure coding as it only needs half the space of 3x replication. Erasure coding is a feature only found in Hadoop 3 and AWS EMR doesn't support Hadoop 3 yet.

Below is a spot check to make sure one of the files has been replicated 3 times without issue.

$ hadoop fsck /trips_parquet/000000_0 \
    -files \
    -blocks \
    -racks
Status: HEALTHY
 Total size:    2079571240 B
 Total dirs:    0
 Total files:   1
 Total symlinks:                0
 Total blocks (validated):      16 (avg. block size 129973202 B)
 Minimally replicated blocks:   16 (100.0 %)
 Over-replicated blocks:        0 (0.0 %)
 Under-replicated blocks:       0 (0.0 %)
 Mis-replicated blocks:         0 (0.0 %)
 Default replication factor:    3
 Average block replication:     3.0
 Corrupt blocks:                0
 Missing replicas:              0 (0.0 %)
 Number of data-nodes:          20
 Number of racks:               1

Setting Up Hive Tables

I'll launch Hive and create schemas for both datasets. This will allow both Spark and Presto to see the datasets as tables they can run SQL against.

$ hive
CREATE EXTERNAL TABLE trips_parquet (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS parquet
  LOCATION '/trips_parquet/';
CREATE EXTERNAL TABLE trips_orc (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/trips_orc/';

Spark SQL on EMR Benchmark Results

The following were the fastest times I saw after running each query multiple times.

$ spark-sql

These four queries were run on the ORC-formatted dataset.

The following completed in 2.362 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 3.559 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 4.019 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 20.412 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

These four queries were run on the Parquet-formatted dataset.

The following completed in 3.401 seconds.

SELECT cab_type,
       count(*)
FROM trips_parquet
GROUP BY cab_type;

The following completed in 3.419 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;

The following completed in 5.441 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 21.307 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Presto on EMR Benchmark Results

The following were the fastest times I saw after running each query multiple times.

$ presto-cli \
    --catalog hive \
    --schema default

These four queries were run on the ORC-formatted dataset.

The following completed in 3.54 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 6.29 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 7.66 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 11.92 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

These four queries were run on the Parquet-formatted dataset.

The following completed in 7.33 seconds.

SELECT cab_type,
       count(*)
FROM trips_parquet
GROUP BY cab_type;

The following completed in 7.17 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;

The following completed in 10.7 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 11.02 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Thoughts on the Results

Below is a recap of this and last year's benchmarks. All measurements are in seconds.

Q1 Q2 Q3 Q4 Application Version & Format
2.362 3.559 4.019 20.412 Spark 2.4.0 & ORC
3.401 3.419 5.441 21.307 Spark 2.4.0 & Parquet
3.54 6.29 7.66 11.92 Presto 0.214 & ORC
7.33 7.17 10.7 11.02 Presto 0.214 & Parquet
28 31 33 80 Spark 2.2.1 & ORC
n/a n/a n/a n/a Spark 2.2.1 & Parquet
4.88 11 12 15 Presto 0.188 & ORC
n/a n/a n/a n/a Presto 0.188 & Parquet

Spark executed Query 1 1.5x faster than Presto. This while using ORC-formatted data which has historically been Presto's most performant format and where its performance edge over Spark was found.

The ORC-based queries outperformed the Parquet ones for both Spark and Presto. All the optimisation work the Apache Spark team has put into their ORC support has tipped the scales against Parquet.

Presto's Parquet performance was almost twice as slow as Spark for Query 3. I suspect there could be a lot of performance found if more engineering time were put into the Parquet reader code for Presto.

Presto still handles large result sets faster than Spark. It's almost twice as fast on Query 4 irrespective of file format.

When I did this benchmark last year on the same sized 21-node EMR cluster Spark 2.2.1 was 12x slower on Query 1 using ORC-formatted data. That is a huge amount of performance to find in the space of a year.

Presto has made performance gains since version 0.188 as well albeit only a 1.37x speed up on Query 1. To be fair, Presto has always been very quick with ORC data so I'm not expecting to see orders-of-magnitude improvements.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2019 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.