Home | Benchmarks | Categories | Atom Feed

Posted on Wed 20 March 2019 under Hadoop

Faster File Distribution with HDFS and S3

In the Hadoop world there is almost always more than one way to accomplish a task. I prefer the platform because it's very unlikely I'm ever backed into a corner when working on a solution.

Hadoop has the ability to decouple storage from compute. The various distributed storage solutions supported all come with their own set of strong points and trade-offs. I often find myself needing to copy data back and forth between HDFS on AWS EMR and AWS S3 for performance reasons.

S3 is a great place to keep a master dataset as it can be used among many clusters without affect the performance of any one of them; it also comes with 11 9s of durability meaning it's one of the most unlikely places for data to go missing or become corrupt.

HDFS is where I find the best performance when running queries. If the workload will take long enough it's worth the time to copy a given dataset off of S3 and onto HDFS; any derivative results can then be transferred back onto S3 before the EMR cluster is terminated.

In this post I'll examine a number of different methods for copying data off of S3 and onto HDFS and see which is the fastest.

AWS EMR, Up & Running

To start, I'll launch an 11-node EMR cluster. I'll use the m3.xlarge instance type with 1 master node, 5 core nodes (these will make up the HDFS cluster) and 5 task nodes (these will run MapReduce jobs). I'm using spot pricing which often reduces the cost of the instances by 75-80% depending on market conditions. Both the EMR cluster and the S3 bucket are located in Ireland.

$ aws emr create-cluster
    --applications Name=Hadoop \
                   Name=Hive \
                   Name=Presto \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --ebs-root-volume-size 10 \
    --ec2-attributes '{
        "KeyName": "emr",
        "InstanceProfile": "EMR_EC2_DefaultRole",
        "AvailabilityZone": "eu-west-1c",
        "EmrManagedSlaveSecurityGroup": "sg-89cd3eff",
        "EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"}' \
    --enable-debugging \
    --instance-groups '[{
        "InstanceCount": 5,
        "BidPrice": "OnDemandPrice",
        "InstanceGroupType": "CORE",
        "InstanceType": "m3.xlarge",
        "Name": "Core - 2"
    },{
        "InstanceCount": 5,
        "BidPrice": "OnDemandPrice",
        "InstanceGroupType": "TASK",
        "InstanceType": "m3.xlarge",
        "Name": "Task - 3"
    },{
        "InstanceCount": 1,
        "BidPrice": "OnDemandPrice",
        "InstanceGroupType": "MASTER",
        "InstanceType": "m3.xlarge",
        "Name": "Master - 1"
    }]' \
    --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
    --name 'My cluster' \
    --region eu-west-1 \
    --release-label emr-5.21.0 \
    --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
    --service-role EMR_DefaultRole \
    --termination-protected

After a few minutes the cluster has been launched and bootstrapped and I'm able to SSH in.

$ ssh -i ~/.ssh/emr.pem \
    hadoop@ec2-54-155-42-195.eu-west-1.compute.amazonaws.com
       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
1 package(s) needed for security, out of 9 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

The five core nodes each have 68.95 GB of capacity that together create 344.75 GB of capacity across the HDFS cluster.

$ hdfs dfsadmin -report \
    | grep 'Configured Capacity'
Configured Capacity: 370168258560 (344.75 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)

The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. This dataset is approximately 86 GB in ORC format spread across 56 files. The typical ORC file is ~1.6 GB in size.

I'll create a filename manifest that I'll use for various operations below. I'll exclude the S3 URL prefix as these names will also be used to address files on HDFS as well.

$ vi files
000000_0
000001_0
000002_0
000003_0
000004_0
000005_0
000006_0
000007_0
000008_0
000009_0
000010_0
000011_0
000012_0
000013_0
000014_0
000015_0
000016_0
000017_0
000018_0
000019_0
000020_0
000021_0
000022_0
000023_0
000024_0
000025_0
000026_0
000027_0
000028_0
000029_0
000030_0
000031_0
000032_0
000033_0
000034_0
000035_0
000036_0
000037_0
000038_0
000039_0
000040_0
000041_0
000042_0
000043_0
000044_0
000045_0
000046_0
000047_0
000048_0
000049_0
000050_0
000051_0
000052_0
000053_0
000054_0
000055_0

I'll adjust the AWS CLI's configuration to allow for up to 100 concurrent requests at any one time.

$ aws configure set \
    default.s3.max_concurrent_requests \
    100

The disk space on the master node cannot hold the entire 86 GB worth of ORC files so I'll download, import onto HDFS and remove each file one at a time. This will allow me to maintain enough working disk space on the master node.

$ hdfs dfs -mkdir /orc
$ time (for FILE in `cat files`; do
            aws s3 cp s3://<bucket>/orc/$FILE ./
            hdfs dfs -copyFromLocal $FILE /orc/
            rm $FILE
        done)

The above completed 15 minutes and 57 seconds.

The HDFS CLI uses the JVM which comes with a fair amount of overhead. In my HDFS CLI benchmark I found the alternative CLI gohdfs could save a lot of start-up time as it is written in GoLang and doesn't run on the JVM. Below I've run the same operation using gohdfs.

$ wget -c -O gohdfs.tar.gz \
    https://github.com/colinmarc/hdfs/releases/download/v2.0.0/gohdfs-v2.0.0-linux-amd64.tar.gz
$ tar xvf gohdfs.tar.gz

I'll clear out the previously downloaded dataset off HDFS first so there is enough space on the cluster going forward. With triple replication, 86 GB turns into 258 GB on disk and there is only 344.75 GB of HDFS capacity in total.

$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (for FILE in `cat files`; do
            aws s3 cp s3://<bucket>/orc/$FILE ./
            gohdfs-v2.0.0-linux-amd64/hdfs put \
                $FILE \
                hdfs://ip-10-10-207-160.eu-west-1.compute.internal:8020/orc/
            rm $FILE
        done)

The above took 27 minutes and 40 seconds. I wasn't expecting this client to be almost twice as slow as the HDFS CLI. S3 provides consistent performance when I've run other tools multiple times so I suspect either the code behind the put functionality could be optimised or there might be a more appropriate endpoint for copying multi-gigabyte files onto HDFS. As of this writing I can't find support for copying from S3 to HDFS directly with resorting to a file system fuse.

The HDFS CLI does support copying from S3 to HDFS directly. Below I'll copy the 56 ORC files to HDFS straight from S3. I'll set the concurrent process limit to 8.

$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (cat files \
        | xargs -n 1 \
                -P 8 \
                -I % \
            hdfs dfs -cp s3://<bucket>/orc/% /orc/)

The above took 14 minutes and 17 seconds. There wasn't much of an improvement over simply copying the files down one at a time and uploading them to HDFS.

I'll try the above command again but set the concurrency limit to 16 processes. Note, this is running on the master node which has 15 GB of RAM and the following will use what little memory capacity is left on the machine.

$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (cat files \
        | xargs -n 1 \
                -P 16 \
                -I % \
            hdfs dfs -cp s3://<bucket>/orc/% /orc/)

The above took 14 minutes and 36 seconds. Again, a very similar time despite a higher concurrency limit. The effective transfer rate was ~98.9 MB/s off of S3. HDFS is configured for triple redundancy but I expect there is a lot more throughput available with a cluster of this size.

DistCp (distributed copy) is bundled with Hadoop and uses MapReduce to copy files in a distributed manner. It can work with HDFS, AWS S3, Azure Blob Storage and Google Cloud Storage. It can break up the downloading and importing across the task nodes so all five machines can work on a single job instead of the master node being the single machine downloading and importing onto HDFS.

$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (hadoop distcp s3://<bucket>/orc/* /orc)

The above completed in 6 minutes and 16 seconds. A huge improvement over the previous methods.

On AWS EMR, there is a tool called S3DistCp that aims to provide the functionality of Hadoop's DistCp but in a fashion optimised for S3. Like DistCp, it uses MapReduce for executing its operations.

$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (s3-dist-cp \
            --src=s3://<bucket>/orc/ \
            --dest=hdfs:///orc/)

The above completed in 5 minutes and 59 seconds. This gives an effective throughput of ~241 MB/s off of S3. There wasn't a huge performance increase over DistCp and I suspect neither tool can greatly out-perform the other.

I did come across settings to increase the chunk size from 128 MB to 1 GB, which would be useful for larger files but enough tooling in the Hadoop ecosystem will suffer from ballooning memory requirements with files over 2 GB that it is very rare to see files larger than this in any sensibly-deployed production environment. S3 usually has low connection setup latency so I can't see this being a huge overhead.

With the above its now understood that both DistCp and S3DistCp can leverage a cluster's task nodes to import data from S3 onto HDFS quickly. I'm going to see how well these tools scale with a 21-node m3.xlarge cluster. This cluster will have 1 master node, 10 core nodes and 10 task nodes.

$ aws emr create-cluster \
    --applications Name=Hadoop \
                   Name=Hive \
                   Name=Presto \
    --auto-scaling-role EMR_AutoScaling_DefaultRole \
    --ebs-root-volume-size 10 \
    --ec2-attributes '{
        "KeyName": "emr",
        "InstanceProfile": "EMR_EC2_DefaultRole",
        "AvailabilityZone": "eu-west-1c",
        "EmrManagedSlaveSecurityGroup": "sg-89cd3eff",
        "EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"}' \
    --enable-debugging \
    --instance-groups '[{
        "InstanceCount": 1,
        "BidPrice": "OnDemandPrice",
        "InstanceGroupType": "MASTER",
        "InstanceType": "m3.xlarge",
        "Name": "Master - 1"
    },{
        "InstanceCount": 10,
        "BidPrice": "OnDemandPrice",
        "InstanceGroupType": "CORE",
        "InstanceType": "m3.xlarge",
        "Name": "Core - 2"
    },{
        "InstanceCount": 10,
        "BidPrice": "OnDemandPrice",
        "InstanceGroupType": "TASK",
        "InstanceType": "m3.xlarge",
        "Name": "Task - 3"
    }]' \
    --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
    --name 'My cluster' \
    --region eu-west-1 \
    --release-label emr-5.21.0 \
    --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
    --service-role EMR_DefaultRole \
    --termination-protected

With the new EMR cluster up and running I can SSH into it.

$ ssh -i ~/.ssh/emr.pem \
    hadoop@ec2-54-78-53-9.eu-west-1.compute.amazonaws.com

Each core node on the HDFS cluster still has 68.95 GB of capacity but the ten machines combined create 689.49 GB of HDFS storage capacity.

$ hdfs dfsadmin -report \
    | grep 'Configured Capacity'
Configured Capacity: 740336517120 (689.49 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)

I'll run S3DistCp first.

$ hdfs dfs -mkdir /orc
$ time (s3-dist-cp \
            --src=s3://<bucket>/orc/ \
            --dest=hdfs:///orc/)

The above completed in 4 minutes and 56 seconds. This is an improvement over the 11-node cluster but not the 2x improvement I was expecting.

Below is DistCp running on the 21-node cluster.

$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (hadoop distcp s3://<bucket>/orc/* /orc)

The above completed in 4 minutes and 44 seconds.

The performance ratio between these two tools is more or less consistent between cluster sizes. Its a shame neither showed linear scaling with twice the number of core and task nodes.

Here is a recap of the transfer times seen in this post.

Duration Transfer Method
27m40s gohdfs, Sequentially
15m57s HDFS DFS CLI, Sequentially
14m36s HDFS DFS CLI, Concurrently x 16
14m17s HDFS DFS CLI, Concurrently x 8
6m16s Hadoop DistCp, 11-Node Cluster
5m59s S3DistCp, 11-Node Cluster
4m56s S3DistCp, 21-Node Cluster
4m44s Hadoop DistCp, 21-Node Cluster

Why Use HDFS At All?

S3 is excellent for durability and doesn't suffer performance-wise if you have one cluster or ten clusters pointed at it. S3 also works as well as HDFS when appending records to a dataset. Both of the following queries will run without issue.

$ presto-cli \
    --schema default \
    --catalog hive
INSERT INTO trips_hdfs
SELECT *
FROM trips_hdfs
LIMIT 10;
INSERT INTO trips_s3
SELECT *
FROM trips_s3
LIMIT 10;

I've heard arguments that S3 is as fast as HDFS but I've never witnessed this in my time with both technologies. Below I'll run a benchmark on the 1.1 billion taxi trips. I'll have one table using S3-backed data and another table using HDFS-backed data. This was run on the 11-node cluster. I ran each query multiple times and recorded the fastest times.

$ hive

This is the HDFS-backed table.

CREATE EXTERNAL TABLE trips_hdfs (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION '/orc/';

This is the S3-backed table.

CREATE EXTERNAL TABLE trips_s3 (
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS orc
  LOCATION 's3://<bucket>/orc/';

I'll use Presto to run the benchmarks.

$ presto-cli \
    --schema default \
    --catalog hive

The following four queries were run on the HDFS-backed table.

The following completed in 6.77 seconds.

SELECT cab_type,
       count(*)
FROM trips_hdfs
GROUP BY cab_type;

The following completed in 10.97 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_hdfs
GROUP BY passenger_count;

The following completed in 13.38 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_hdfs
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 19.82 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_hdfs
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

The following four queries were run on the S3-backed table.

The following completed in 10.82 seconds.

SELECT cab_type,
       count(*)
FROM trips_s3
GROUP BY cab_type;

The following completed in 14.73 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips_s3
GROUP BY passenger_count;

The following completed in 19.19 seconds.

SELECT passenger_count,
       year(pickup_datetime),
       count(*)
FROM trips_s3
GROUP BY passenger_count,
         year(pickup_datetime);

The following completed in 24.61 seconds.

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       round(trip_distance),
       count(*) trips
FROM trips_s3
GROUP BY passenger_count,
         year(pickup_datetime),
         round(trip_distance)
ORDER BY trip_year,
         trips desc;

This is a recap of the query times above.

HDFS on AWS EMR AWS S3 Speed Up Query
6.77s 10.82s 1.6x Query 1
10.97s 14.73s 1.34x Query 2
13.38s 19.19s 1.43x Query 3
19.82s 24.61s 1.24x Query 4

The HDFS-backed queries were anywhere from 1.24x to 1.6x faster than the S3-backed queries.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2019 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.