Home | Benchmarks | Categories | Atom Feed

Posted on Wed 09 January 2019 under Presto

Convert CSVs to ORC Faster

Every analytical database I've used converts imported data into a form that is quicker to read. Often this means storing data in column form instead of row form. The taxi trip dataset I benchmark with is around 100 GB when gzip-compressed compressed in row form but the five columns that are queried can be stored in around 3.5 GB of space in columnar form when compressed using a mixture of dictionary encoding, run-length encoding and Snappy compression.

The process of converting rows into columns is time consuming and compute-intensive. Most systems can take the better part of an hour to finish this conversion, even when using a cluster of machines. I once believed that compression was causing most of the overhead but in researching this post I found Spark 2.4.0 had a ~7% difference in conversion time between using Snappy, zlib, lzo and not using any compression at all.

I've historically used Hive for this conversion process but there are ways to mix and match different Hadoop tools, including Spark and Presto, to get the same outcome and often with very different processing times.

Spark, Hive and Presto are all very different code bases. Spark is made up of 500K lines of Scala, 110K lines of Java and 40K lines of Python. Presto is made up of 600K lines of Java. Hive is made up of over one million lines of Java and 100K lines of C++ code. Any libraries they share are out-weighted by the unique approaches they've taken in the architecture surrounding their SQL parsers, query planners, optimizers, code generators and execution engines when it comes to tabular form conversion.

I recently benchmarked Spark 2.4.0 and Presto 0.214 and found that Spark out-performed Presto when it comes to ORC-based queries. In this post I'm going to examine the ORC writing performance of these two engines plus Hive and see which can convert CSV files into ORC files the fastest.

AWS EMR Up & Running

The following will launch an EMR cluster with a single master node and 20 core nodes. The cluster runs version 2.8.5 of Amazon's Hadoop distribution, Hive 2.3.4, Presto 0.214 and Spark 2.4.0. All nodes are spot instances to keep the cost down.

$ aws emr create-cluster
    --applications \
        Name=Hadoop \
        Name=Hive \
        Name=Presto \
        Name=Spark \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --ebs-root-volume-size 10 \
    --ec2-attributes '{
        "KeyName": "emr",
        "InstanceProfile": "EMR_EC2_DefaultRole",
        "EmrManagedSlaveSecurityGroup": "sg-...",
        "EmrManagedMasterSecurityGroup": "sg-..."
    }' \
    --enable-debugging \
    --instance-groups '[
        {
            "Name": "Core - 2",
            "InstanceCount": 20,
            "BidPrice": "OnDemandPrice",
            "InstanceType": "m3.xlarge",
            "InstanceGroupType": "CORE"
        },
        {
            "InstanceCount": 1,
            "Name": "Master - 1",
            "InstanceGroupType": "MASTER",
            "EbsConfiguration": {
                "EbsOptimized": false,
                "EbsBlockDeviceConfigs": [
                    {
                        "VolumeSpecification": {
                            "VolumeType": "standard",
                            "SizeInGB": 400
                        },
                        "VolumesPerInstance": 1
                    }
                ]
            },
            "BidPrice": "OnDemandPrice",
            "InstanceType": "m3.xlarge"
        }
    ]' \
    --log-uri 's3n://aws-logs-...-eu-west-1/elasticmapreduce/' \
    --name 'My cluster' \
    --region eu-west-1 \
    --release-label emr-5.20.0 \
    --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
    --service-role EMR_DefaultRole \
    --termination-protected

With the cluster provisioned and bootstrapped I was able to SSH in.

$ ssh -i ~/.ssh/emr.pem \
    hadoop@54.155.139.6
       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
3 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

The CSV dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. They're stored on AWS S3 so I'll configure the aws CLI with my access and secret keys and retrieve them.

$ aws configure

I'll then set the client's concurrent requests limit to 100 so the files download quicker than they would with stock settings.

$ aws configure set \
    default.s3.max_concurrent_requests \
    100

I've requested an EBS storage volume on the master node so that I can download the dataset before loading it onto HDFS.

$ df -h
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs        7.4G   80K  7.4G   1% /dev
tmpfs           7.4G     0  7.4G   0% /dev/shm
/dev/xvda1      9.8G  4.6G  5.1G  48% /
/dev/xvdb1      5.0G   33M  5.0G   1% /emr
/dev/xvdb2       33G  289M   33G   1% /mnt
/dev/xvdc        38G   33M   38G   1% /mnt1
/dev/xvdd       400G   33M  400G   1% /mnt2

I ran the following to pull the dataset off of S3.

$ mkdir -p /mnt2/csv/
$ aws s3 sync s3://<bucket>/csv/ /mnt2/csv/

I then ran the following to push the data onto HDFS.

$ hdfs dfs -mkdir /trips_csv/
$ hdfs dfs -copyFromLocal /mnt2/csv/*.csv.gz /trips_csv/

Converting CSVs to ORC using Hive

I'll use Hive to create a schema catalogue for the various datasets that will be produced in this benchmark. The following will create the table for the CSV-formatted dataset.

$ hive
CREATE TABLE trips_csv (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION '/trips_csv/';

I'll create a table that will store the dataset as an ORC-formatted, Snappy-compressed dataset that will be produced by Hive.

CREATE TABLE trips_orc_snappy_hive (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/trips_orc_snappy_hive/'
  TBLPROPERTIES ("orc.compress"="snappy");

Below I'll convert the CSV dataset into ORC using Hive alone. The following took 55 mins and 5 seconds.

INSERT INTO trips_orc_snappy_hive
SELECT * FROM trips_csv;

Converting CSVs to ORC using Spark

I'll create a table for Spark to store its ORC-formatted, Snappy-compressed data.

$ hive
CREATE TABLE trips_orc_snappy_spark (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/trips_orc_snappy_spark/'
  TBLPROPERTIES ("orc.compress"="snappy");

I'll then launch Spark and convert the CSV data into ORC using its engine.

$ spark-sql

The following took 1 hour, 43 mins and 7 seconds.

INSERT INTO TABLE trips_orc_snappy_spark
SELECT * FROM trips_csv;

To show that Parquet isn't the more optimised format I'll create a table to store Snappy-compressed data in Parquet format and run the same CSV conversion.

CREATE TABLE trips_parquet_snappy_spark (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS parquet
  LOCATION '/trips_parquet_snappy_spark/'
  TBLPROPERTIES ("parquet.compress"="snappy");

The following took 1 hour, 56 minutes and 29 seconds.

INSERT INTO TABLE trips_parquet_snappy_spark
SELECT * FROM trips_csv;

The HDFS cluster only has 1.35 TB of capacity and 3x replication is being used so I'll clear out these datasets before continuing.

$ hdfs dfs -rm -r -skipTrash /trips_orc_snappy_hive/
$ hdfs dfs -rm -r -skipTrash /trips_orc_snappy_spark/
$ hdfs dfs -rm -r -skipTrash /trips_parquet_snappy_spark/

Converting CSVs to ORC using Presto

Below I'll create a table for Presto to store a Snappy-compressed ORC dataset.

$ hive
CREATE TABLE trips_orc_snappy_presto (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/trips_orc_snappy_presto/'
  TBLPROPERTIES ("orc.compress"="snappy");

I'll run the CSV to ORC conversion in Presto's CLI.

$ presto-cli \
    --schema default \
    --catalog hive

The following took 37 mins and 35 seconds.

INSERT INTO trips_orc_snappy_presto
SELECT * FROM trips_csv;

The above generated a dataset 118.6 GB in size (excluding HDFS replication).

$ hdfs dfs -du -s -h /trips_orc_snappy_presto/

Facebook have been working on implementing ZStandard, a lossless data compression algorithm, and integrating it into various tools in the Hadoop ecosystem. Spark 2.4.0 on EMR with stock settings isn't able to read this compression scheme but Presto can both read and write it.

I'll create a ZStandard-compressed table for Presto below using Hive.

$ hive
CREATE TABLE trips_orc_zstd_presto (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/trips_orc_zstd_presto/'
  TBLPROPERTIES ("orc.compress"="zstd");

I'll then use Presto to convert the CSV data into ZStandard-compressed ORC files.

$ presto-cli \
    --schema default \
    --catalog hive

The following took 37 mins and 44 seconds.

INSERT INTO trips_orc_zstd_presto
SELECT * FROM trips_csv;

The above generated 79 GB of data (excluding HDFS replication).

$ hdfs dfs -du -s -h /trips_orc_zstd_presto/

Presto ORC Benchmark: Snappy versus ZStandard

ZStandard did a good job to save space on HDFS and still converted the data in a very short amount of time. Below I'll look at the impact of the two compression schemes on query performance.

The following were the fastest times I saw after running each query multiple times.

$ presto-cli \
    --schema default \
    --catalog hive

These four queries were run on the Snappy-compressed, ORC-formatted dataset.

The following completed in 5.48 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc_snappy_presto
GROUP BY cab_type;

The following completed in 7.85 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_snappy_presto
GROUP BY passenger_count;

The following completed in 8.55 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_snappy_presto
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 11.92 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_snappy_presto
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

These four queries were run on the ZStandard-compressed, ORC-formatted dataset.

The following completed in 4.21 seconds.

SELECT cab_type,
       count(*)
FROM trips_orc_zstd_presto
GROUP BY cab_type;

The following completed in 5.97 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_orc_zstd_presto
GROUP BY passenger_count;

The following completed in 7.3 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_orc_zstd_presto
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 11.68 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_orc_zstd_presto
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

Thoughts on the Results

Hive being twice as fast as Spark at converting CSVs to ORC files took me by surprise as Spark has a younger code base. That being said, Presto being 1.5x faster as Hive was another shocker. I'm hoping in publishing this post that the community are made more aware of these performance differences and we can find improvements in future releases of all three packages.

Hive being a central store of catalogue data for so many tools in the Hadoop ecosystem cannot be ignored and I expect I will continue to use it for a long time to come. Where I'm put into an awkward position is in recommending that converting CSVs into ORCs should be done in Presto for significant time savings but querying that data afterword is quicker in Spark. There really is no one tool that rules them all.

ZStandard offers a lot in terms of disk space savings while not impacting query performance significantly. To take the raw data and make it 1.5x smaller while not impacting conversion time is fantastic. When Hadoop 3's Erasure Coding is mixed into the equation it'll bring space requirements down by 2/3rds on HDFS. Being able to buy a petabyte of storage capacity and storing three times as much data on it is huge.

I feel that my research into converting CSVs into ORC files is just getting started. Beyond fundamental architectural differences between these three pieces of software I suspect stock settings on EMR could be improved to provide faster conversion times. This isn't something I can prove at this point in time and this will be a subject of further research.

Spark did receive support for ZStandard in 2.3.0 so I suspect it's nothing more than a configuration change to add support into Spark on EMR. I'm going to keep an eye on future releases of AWS EMR to see if this is added to the stock settings.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.