Home | Benchmarks | Categories | Atom Feed

Posted on Mon 30 January 2017 under Databases

1.1 Billion Taxi Rides on AWS EMR 5.3.0 & Spark 2.1.0

Last March I benchmarked Spark 1.6 against the 1.1 billion taxi rides dataset on a 5-node, m3.xlarge EMR 4.3.0 cluster using S3 for storage. The results were some of the slowest I've seen. Though OLAP workloads aren't Spark's key selling point the lack of good data locality didn't help and it's something I wanted to revisit.

In this blog post I'll benchmark the same dataset using an 11-node, m3.xlarge EMR 5.3.0 cluster running Spark 2.1.0. I'll be storing the data in both Parquet and ORC format on HDFS which will sit on SSD drives.

AWS EMR 4.3.0 shipped with Spark 1.6.0. Since then, EMR 5.3.0 has been released which comes with Spark 2.1.0. Between versions 1.6.0 and 2.1.0 of Spark there have been over 1,200 improvements alone in the code base. Things like changing the default compression to Snappy, using the vectorized parquet reader by default and performance improvements in reading dictionary encoding are just a couple of the many performance improvements seen over the past 10 months.

Spark usually performed better with Parquet files over ORC files whereas with Presto is usually the opposite. I want to see if this is still the case in Spark 2.1.0 and Presto 0.157.1.

The dataset being used is the same one I've used to benchmark Amazon Athena, BigQuery, Elasticsearch, kdb+/q, MapD, PostgreSQL, Presto, Redshift as well as the previous Spark 1.6 benchmark. I've compiled a single-page summary of these benchmarks.

AWS CLI Up and Running

All the following commands were run on a fresh install of Ubuntu 14.04.3.

To start, I'll install the AWS CLI tool and a few dependencies it needs to run.

$ sudo apt update
$ sudo apt install \
    python-pip \
    python-virtualenv
$ virtualenv amazon
$ source amazon/bin/activate
$ pip install awscli

I'll then enter my AWS credentials.

$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY

I'll run configure to make sure eu-west-1 is my default region.

$ aws configure
AWS Access Key ID [********************]:
AWS Secret Access Key [********************]:
Default region name [eu-west-1]: eu-west-1
Default output format [None]:

The following will allow for 100 concurrent requests to run when interacting with S3 and does a good job of saturating my Internet connection.

$ aws configure set \
    default.s3.max_concurrent_requests \
    100

I've created an S3 bucket in the US Standard region called "taxis2". The following will sync the 56 compressed CSV files on my system with that bucket into a folder called "csv".

$ aws s3 sync 20M-blocks/ \
    s3://taxis2/csv/

Launching an EMR Cluster

I'll be launching an 11-node cluster of m3.xlarge instances using the 5.3.0 release of AWS EMR. This comes with Hadoop 2.7.3, Hive 2.1.1, Spark 2.1.0 and Presto 0.157.1.

There will be one master node using an on-demand instance costing $0.293 / hour plus five core and five task nodes all using spot instances that will cost at most $0.05 / hour each.

$ aws emr create-cluster \
    --applications \
        Name=Hadoop \
        Name=Hive \
        Name=Spark \
        Name=Presto \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --ec2-attributes '{
        "KeyName": "emr",
        "InstanceProfile": "EMR_EC2_DefaultRole",
        "SubnetId": "subnet-6a55a532",
        "EmrManagedSlaveSecurityGroup": "sg-9f2607f9",
        "EmrManagedMasterSecurityGroup": "sg-902607f6"}' \
    --enable-debugging \
    --instance-groups '[{
            "InstanceCount": 1,
            "InstanceGroupType": "MASTER",
            "InstanceType": "m3.xlarge",
            "Name": "Master - 1"
        },{
            "InstanceCount": 5,
            "BidPrice": "0.05",
            "InstanceGroupType": "TASK",
            "InstanceType": "m3.xlarge",
            "Name": "Task - 3"
        },{
            "InstanceCount": 5,
            "BidPrice": "0.05",
            "InstanceGroupType": "CORE",
            "InstanceType": "m3.xlarge",
            "Name": "Core - 2"
        }]' \
    --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
    --name 'My cluster' \
    --region eu-west-1 \
    --release-label emr-5.3.0 \
    --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
    --service-role EMR_DefaultRole \
    --termination-protected

After 15 minutes the machines had all been provisioned, bootstrapped and I was able to SSH into the master node.

$ ssh -o ServerAliveInterval=50 \
      -i ~/.ssh/emr.pem \
      hadoop@54.194.196.223
       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/
7 package(s) needed for security, out of 7 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

HDFS has already been setup by EMR with 344.75 GB of capacity which is all SSD-backed.

$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 369031170158 (343.69 GB)
DFS Remaining: 368997994496 (343.66 GB)
DFS Used: 33175662 (31.64 MB)
DFS Used%: 0.01%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

1.1B Trips in Parquet Format

The gzip-compressed CSV files are already up on S3 so I will create a table in Hive pointing to them.

$ screen
$ hive
CREATE EXTERNAL TABLE trips_csv (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION 's3://taxis2/csv/';

I'll then create a second table called "trips_parquet" that will store data in Parquet format using Snappy compression. Hive will store this table on HDFS in a folder called /user/hive/warehouse/trips_parquet.

CREATE EXTERNAL TABLE trips_parquet (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS parquet
  TBLPROPERTIES ("parquet.compression"="SNAPPY");

The following will copy and convert the CSV data into Parquet format.

INSERT INTO trips_parquet
SELECT * FROM trips_csv;

The above completed in 1 hour, 39 minutes and 6 seconds.

Query ID = hadoop_20170129101621_bd19132e-5e10-4755-aebd-46ab7dcab946
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1485684315400_0001)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     56         56        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 5942.56 s
----------------------------------------------------------------------------------------------
Loading data to table default.trips_parquet
OK
Time taken: 5946.652 seconds

62.27% of HDFS' capacity was taken up by the Parquet-formatted data (around 210 GB).

$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 368530566198 (343.22 GB)
DFS Remaining: 142720930206 (132.92 GB)
DFS Used: 225809635992 (210.30 GB)
DFS Used%: 61.27%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

The conversion process created a Parquet counterpart for each CSV file. The gzip-compressed CSV files are around 1.8 GB each. The Snappy-compressed Parquet files are just slightly larger in size.

$ hdfs dfs -ls -R /user/hive/warehouse/trips_parquet/
... 2079571214 2017-01-29 11:55 .../000000_0
... 2053401813 2017-01-29 11:53 .../000001_0
... 2048854301 2017-01-29 11:52 .../000002_0
... 2052561364 2017-01-29 11:53 .../000003_0
... 2050481346 2017-01-29 11:54 .../000004_0
... 2065505270 2017-01-29 11:54 .../000005_0
... 2066827529 2017-01-29 11:54 .../000006_0
... 2067046393 2017-01-29 11:55 .../000007_0
... 2067024340 2017-01-29 11:04 .../000008_0
... 2064177211 2017-01-29 11:15 .../000009_0
... 2047959045 2017-01-29 11:14 .../000010_0
... 2065216794 2017-01-29 10:55 .../000011_0
... 2060736800 2017-01-29 10:57 .../000012_0
... 2053307829 2017-01-29 11:03 .../000013_0
... 2059428750 2017-01-29 11:05 .../000014_0
... 2050781990 2017-01-29 11:50 .../000015_0
... 2049048934 2017-01-29 10:55 .../000016_0
... 2056438442 2017-01-29 11:04 .../000017_0
... 2058827623 2017-01-29 11:40 .../000018_0
... 2048821684 2017-01-29 11:49 .../000019_0
... 2063570750 2017-01-29 11:38 .../000020_0
... 2046578172 2017-01-29 11:39 .../000021_0
... 2042163746 2017-01-29 11:04 .../000022_0
... 2049370077 2017-01-29 11:50 .../000023_0
... 1859978547 2017-01-29 11:40 .../000024_0
... 2068601824 2017-01-29 11:50 .../000025_0
... 2044455767 2017-01-29 11:24 .../000026_0
... 2041972949 2017-01-29 11:39 .../000027_0
... 2042778883 2017-01-29 11:23 .../000028_0
... 2061181140 2017-01-29 11:04 .../000029_0
... 2046059808 2017-01-29 11:40 .../000030_0
... 2074502699 2017-01-29 11:50 .../000031_0
... 2046854667 2017-01-29 11:39 .../000032_0
... 2062188950 2017-01-29 11:03 .../000033_0
... 2056792317 2017-01-29 11:23 .../000034_0
... 2057704636 2017-01-29 11:40 .../000035_0
... 2064976471 2017-01-29 11:38 .../000036_0
... 2049477312 2017-01-29 11:50 .../000037_0
... 2053522738 2017-01-29 11:03 .../000038_0
... 2063258086 2017-01-29 11:39 .../000039_0
... 2062130115 2017-01-29 11:49 .../000040_0
... 1607069101 2017-01-29 11:22 .../000041_0
... 2053130061 2017-01-29 11:40 .../000042_0
... 2049578788 2017-01-29 11:03 .../000043_0
... 2045883456 2017-01-29 11:03 .../000044_0
... 2050745484 2017-01-29 11:03 .../000045_0
... 2048313061 2017-01-29 11:03 .../000046_0
... 2053616037 2017-01-29 11:13 .../000047_0
... 2106530680 2017-01-29 11:50 .../000048_0
... 2053560763 2017-01-29 11:39 .../000049_0
... 2057580280 2017-01-29 11:39 .../000050_0
... 1681822606 2017-01-29 11:21 .../000051_0
... 1530714219 2017-01-29 11:11 .../000052_0
... 1533768620 2017-01-29 11:11 .../000053_0
... 1548224214 2017-01-29 11:34 .../000054_0
... 1397076636 2017-01-29 11:06 .../000055_0

The Parquet Benchmark

Spark reports query times with granularity to the millisecond. Presto will only report times to the second so keep that in mind when analysing the query times.

First up, I'll benchmark the Parquet-formatted data in Spark.

$ spark-sql

The following completed in 10.19 seconds.

SELECT cab_type,
       count(*)
FROM trips_parquet
GROUP BY cab_type;

The following completed in 8.134 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;

The following completed in 19.624 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 85.942 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Next, I'll query the same tables on the same cluster using Presto.

$ presto-cli \
    --schema default \
    --catalog hive

The following completed in 16 seconds.

SELECT cab_type,
       count(*)
FROM trips_parquet
GROUP BY cab_type;

The following completed in 22 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;

The following completed in 30 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 37 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

1.1B Trips in ORC Format

There isn't enough HDFS capacity to store both the Parquet and ORC datasets so I'll remove the Parquet table and files first.

$ hdfs dfs -rm -r /user/hive/warehouse/trips_parquet
$ hive
DROP TABLE trips_parquet;

I'll then check that there are a few hundred gigs of free space on HDFS.

$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 367607626838 (342.36 GB)
DFS Remaining: 367171158016 (341.95 GB)
DFS Used: 436468822 (416.25 MB)
DFS Used%: 0.12%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

I'll then launch Hive and create an ORC-formatted table using some compression, stripe and stride settings that have suited this dataset well in the past.

$ hive
CREATE EXTERNAL TABLE trips_orc (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  TBLPROPERTIES ("orc.compress"="SNAPPY",
                 "orc.stripe.size"="67108864",
                 "orc.row.index.stride"="50000");

The following will copy and convert the CSV data into ORC format.

INSERT INTO trips_orc
SELECT * FROM trips_csv;

The above completed in 1 hour, 21 minutes and 49 seconds.

Query ID = hadoop_20170129122340_868b5b63-6783-4382-9618-8aa5f8a16cbf
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1485684315400_0003)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     56         56        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 4905.12 s
----------------------------------------------------------------------------------------------
Loading data to table default.trips_orc
OK
Time taken: 4908.814 seconds

Once that completed I could see that 70.91% of HDFS capacity had been eaten up. This dataset converted 1.2x faster than its Parquet counterpart but is 1.15x larger.

$ hdfs dfsadmin -report | head
Configured Capacity: 370168258560 (344.75 GB)
Present Capacity: 367736988391 (342.48 GB)
DFS Remaining: 106990582595 (99.64 GB)
DFS Used: 260746405796 (242.84 GB)
DFS Used%: 70.91%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

Each ORC file is around 4-500MB larger than their gzip-compressed CSV counterpart.

$ hdfs dfs -ls -R /user/hive/warehouse/trips_orc/
... 2373955461 2017-01-29 13:44 .../000000_0
... 2391655597 2017-01-29 13:43 .../000001_0
... 2389090892 2017-01-29 13:42 .../000002_0
... 2386074150 2017-01-29 13:42 .../000003_0
... 2383737570 2017-01-29 13:42 .../000004_0
... 2373355323 2017-01-29 13:44 .../000005_0
... 2374994084 2017-01-29 13:43 .../000006_0
... 2362979370 2017-01-29 13:45 .../000007_0
... 2366033760 2017-01-29 12:54 .../000008_0
... 2364710220 2017-01-29 13:02 .../000009_0
... 2378459534 2017-01-29 13:01 .../000010_0
... 2365652224 2017-01-29 13:08 .../000011_0
... 2350225556 2017-01-29 13:09 .../000012_0
... 2358764639 2017-01-29 13:00 .../000013_0
... 2346091471 2017-01-29 13:02 .../000014_0
... 2358653778 2017-01-29 13:40 .../000015_0
... 2357066399 2017-01-29 13:07 .../000016_0
... 2360479698 2017-01-29 13:40 .../000017_0
... 2380295397 2017-01-29 13:31 .../000018_0
... 2363677667 2017-01-29 13:01 .../000019_0
... 2364109075 2017-01-29 13:31 .../000020_0
... 2366421728 2017-01-29 13:22 .../000021_0
... 2356203058 2017-01-29 13:08 .../000022_0
... 2362475800 2017-01-29 13:40 .../000023_0
... 2088581545 2017-01-29 13:21 .../000024_0
... 2374664877 2017-01-29 13:23 .../000025_0
... 2364000940 2017-01-29 13:40 .../000026_0
... 2360722761 2017-01-29 13:31 .../000027_0
... 2362829388 2017-01-29 13:23 .../000028_0
... 2383698456 2017-01-29 13:10 .../000029_0
... 2358520407 2017-01-29 13:32 .../000030_0
... 2409406129 2017-01-29 13:40 .../000031_0
... 2386976953 2017-01-29 13:21 .../000032_0
... 2355724036 2017-01-29 13:23 .../000033_0
... 2358846741 2017-01-29 13:11 .../000034_0
... 2383852350 2017-01-29 13:31 .../000035_0
... 2395248208 2017-01-29 13:40 .../000036_0
... 2391861144 2017-01-29 13:21 .../000037_0
... 2378207274 2017-01-29 13:11 .../000038_0
... 2393120542 2017-01-29 13:22 .../000039_0
... 2382935318 2017-01-29 13:40 .../000040_0
... 1777333298 2017-01-29 13:31 .../000041_0
... 2386615111 2017-01-29 13:00 .../000042_0
... 2395027003 2017-01-29 13:21 .../000043_0
... 2393883730 2017-01-29 13:22 .../000044_0
... 2381392690 2017-01-29 13:10 .../000045_0
... 2379996162 2017-01-29 12:53 .../000046_0
... 2380507329 2017-01-29 13:01 .../000047_0
... 2381376756 2017-01-29 13:40 .../000048_0
... 2385868026 2017-01-29 13:32 .../000049_0
... 2388616745 2017-01-29 13:21 .../000050_0
... 1879808547 2017-01-29 13:20 .../000051_0
... 1734216308 2017-01-29 13:08 .../000052_0
... 1737411028 2017-01-29 12:59 .../000053_0
... 1724326096 2017-01-29 12:52 .../000054_0
... 1611855003 2017-01-29 12:56 .../000055_0

The ORC Benchmark

I'll launch Spark again and see how fast it can query the ORC-formatted data.

$ spark-sql

The following completed in 46.951 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 54.089 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 56.174 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 131.729 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

I'll then run the same queries in Presto.

$ presto-cli \
    --schema default \
    --catalog hive

The following completed in 10 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc
GROUP BY cab_type;

The following completed in 13 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;

The following completed in 15 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 21 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Concluding Analysis

The Presto / ORC combination had the fastest cumulative query times but that shouldn't overshadow how far along Spark's OLAP performance has come.

Queries 1, 2 and 3 return 2, 57 and 134 rows respectively and Spark out performed Presto 1.5-2.7x when running these queries with Parquet data. But query 4 returns 4,802 rows and Presto managed to out-perform Spark with both Parquet and ORC-formatted data when returning this long set of results (2.3x and 6.3x faster respectively).

The ORC performance in Spark is still lagging Presto's. Queries were anywhere from 3.7x to 6.3x slower when using this format. That being said, when I looked at Spark 1.6 the ORC queries were ~7.7x slower than their Parquet counterparts; that gap has closed down to 1.5x slower in some cases. I suspect there are a number of ORC-related optimisations Spark could bring in future releases.

It's interesting to see the 242 GB of ORC data could be queried quicker than the 210 GB of Parquet data by Presto. This leads me to believe I/O and networking aren't as big bottlenecks as I would have imagined.

The long file conversion times are something I'd like to find optimisations for. Anyone who has some pointers in invited to get in touch.

Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2024 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.