In the Hadoop world there is almost always more than one way to accomplish a task. I prefer the platform because it's very unlikely I'm ever backed into a corner when working on a solution.
Hadoop has the ability to decouple storage from compute. The various distributed storage solutions supported all come with their own set of strong points and trade-offs. I often find myself needing to copy data back and forth between HDFS on AWS EMR and AWS S3 for performance reasons.
S3 is a great place to keep a master dataset as it can be used among many clusters without affect the performance of any one of them; it also comes with 11 9s of durability meaning it's one of the most unlikely places for data to go missing or become corrupt.
HDFS is where I find the best performance when running queries. If the workload will take long enough it's worth the time to copy a given dataset off of S3 and onto HDFS; any derivative results can then be transferred back onto S3 before the EMR cluster is terminated.
In this post I'll examine a number of different methods for copying data off of S3 and onto HDFS and see which is the fastest.
AWS EMR, Up & Running
To start, I'll launch an 11-node EMR cluster. I'll use the m3.xlarge instance type with 1 master node, 5 core nodes (these will make up the HDFS cluster) and 5 task nodes (these will run MapReduce jobs). I'm using spot pricing which often reduces the cost of the instances by 75-80% depending on market conditions. Both the EMR cluster and the S3 bucket are located in Ireland.
$ aws emr create-cluster
--applications Name=Hadoop \
Name=Hive \
Name=Presto \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--ebs-root-volume-size 10 \
--ec2-attributes '{
"KeyName": "emr",
"InstanceProfile": "EMR_EC2_DefaultRole",
"AvailabilityZone": "eu-west-1c",
"EmrManagedSlaveSecurityGroup": "sg-89cd3eff",
"EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"}' \
--enable-debugging \
--instance-groups '[{
"InstanceCount": 5,
"BidPrice": "OnDemandPrice",
"InstanceGroupType": "CORE",
"InstanceType": "m3.xlarge",
"Name": "Core - 2"
},{
"InstanceCount": 5,
"BidPrice": "OnDemandPrice",
"InstanceGroupType": "TASK",
"InstanceType": "m3.xlarge",
"Name": "Task - 3"
},{
"InstanceCount": 1,
"BidPrice": "OnDemandPrice",
"InstanceGroupType": "MASTER",
"InstanceType": "m3.xlarge",
"Name": "Master - 1"
}]' \
--log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
--name 'My cluster' \
--region eu-west-1 \
--release-label emr-5.21.0 \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--service-role EMR_DefaultRole \
--termination-protected
After a few minutes the cluster has been launched and bootstrapped and I'm able to SSH in.
$ ssh -i ~/.ssh/emr.pem \
hadoop@ec2-54-155-42-195.eu-west-1.compute.amazonaws.com
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/
1 package(s) needed for security, out of 9 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
The five core nodes each have 68.95 GB of capacity that together create 344.75 GB of capacity across the HDFS cluster.
$ hdfs dfsadmin -report \
| grep 'Configured Capacity'
Configured Capacity: 370168258560 (344.75 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. This dataset is approximately 86 GB in ORC format spread across 56 files. The typical ORC file is ~1.6 GB in size.
I'll create a filename manifest that I'll use for various operations below. I'll exclude the S3 URL prefix as these names will also be used to address files on HDFS as well.
$ vi files
000000_0
000001_0
000002_0
000003_0
000004_0
000005_0
000006_0
000007_0
000008_0
000009_0
000010_0
000011_0
000012_0
000013_0
000014_0
000015_0
000016_0
000017_0
000018_0
000019_0
000020_0
000021_0
000022_0
000023_0
000024_0
000025_0
000026_0
000027_0
000028_0
000029_0
000030_0
000031_0
000032_0
000033_0
000034_0
000035_0
000036_0
000037_0
000038_0
000039_0
000040_0
000041_0
000042_0
000043_0
000044_0
000045_0
000046_0
000047_0
000048_0
000049_0
000050_0
000051_0
000052_0
000053_0
000054_0
000055_0
I'll adjust the AWS CLI's configuration to allow for up to 100 concurrent requests at any one time.
$ aws configure set \
default.s3.max_concurrent_requests \
100
The disk space on the master node cannot hold the entire 86 GB worth of ORC files so I'll download, import onto HDFS and remove each file one at a time. This will allow me to maintain enough working disk space on the master node.
$ hdfs dfs -mkdir /orc
$ time (for FILE in `cat files`; do
aws s3 cp s3://<bucket>/orc/$FILE ./
hdfs dfs -copyFromLocal $FILE /orc/
rm $FILE
done)
The above completed 15 minutes and 57 seconds.
The HDFS CLI uses the JVM which comes with a fair amount of overhead. In my HDFS CLI benchmark I found the alternative CLI gohdfs could save a lot of start-up time as it is written in GoLang and doesn't run on the JVM. Below I've run the same operation using gohdfs.
$ wget -c -O gohdfs.tar.gz \
https://github.com/colinmarc/hdfs/releases/download/v2.0.0/gohdfs-v2.0.0-linux-amd64.tar.gz
$ tar xvf gohdfs.tar.gz
I'll clear out the previously downloaded dataset off HDFS first so there is enough space on the cluster going forward. With triple replication, 86 GB turns into 258 GB on disk and there is only 344.75 GB of HDFS capacity in total.
$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (for FILE in `cat files`; do
aws s3 cp s3://<bucket>/orc/$FILE ./
gohdfs-v2.0.0-linux-amd64/hdfs put \
$FILE \
hdfs://ip-10-10-207-160.eu-west-1.compute.internal:8020/orc/
rm $FILE
done)
The above took 27 minutes and 40 seconds. I wasn't expecting this client to be almost twice as slow as the HDFS CLI. S3 provides consistent performance when I've run other tools multiple times so I suspect either the code behind the put functionality could be optimised or there might be a more appropriate endpoint for copying multi-gigabyte files onto HDFS. As of this writing I can't find support for copying from S3 to HDFS directly with resorting to a file system fuse.
The HDFS CLI does support copying from S3 to HDFS directly. Below I'll copy the 56 ORC files to HDFS straight from S3. I'll set the concurrent process limit to 8.
$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (cat files \
| xargs -n 1 \
-P 8 \
-I % \
hdfs dfs -cp s3://<bucket>/orc/% /orc/)
The above took 14 minutes and 17 seconds. There wasn't much of an improvement over simply copying the files down one at a time and uploading them to HDFS.
I'll try the above command again but set the concurrency limit to 16 processes. Note, this is running on the master node which has 15 GB of RAM and the following will use what little memory capacity is left on the machine.
$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (cat files \
| xargs -n 1 \
-P 16 \
-I % \
hdfs dfs -cp s3://<bucket>/orc/% /orc/)
The above took 14 minutes and 36 seconds. Again, a very similar time despite a higher concurrency limit. The effective transfer rate was ~98.9 MB/s off of S3. HDFS is configured for triple redundancy but I expect there is a lot more throughput available with a cluster of this size.
DistCp (distributed copy) is bundled with Hadoop and uses MapReduce to copy files in a distributed manner. It can work with HDFS, AWS S3, Azure Blob Storage and Google Cloud Storage. It can break up the downloading and importing across the task nodes so all five machines can work on a single job instead of the master node being the single machine downloading and importing onto HDFS.
$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (hadoop distcp s3://<bucket>/orc/* /orc)
The above completed in 6 minutes and 16 seconds. A huge improvement over the previous methods.
On AWS EMR, there is a tool called S3DistCp that aims to provide the functionality of Hadoop's DistCp but in a fashion optimised for S3. Like DistCp, it uses MapReduce for executing its operations.
$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (s3-dist-cp \
--src=s3://<bucket>/orc/ \
--dest=hdfs:///orc/)
The above completed in 5 minutes and 59 seconds. This gives an effective throughput of ~241 MB/s off of S3. There wasn't a huge performance increase over DistCp and I suspect neither tool can greatly out-perform the other.
I did come across settings to increase the chunk size from 128 MB to 1 GB, which would be useful for larger files but enough tooling in the Hadoop ecosystem will suffer from ballooning memory requirements with files over 2 GB that it is very rare to see files larger than this in any sensibly-deployed production environment. S3 usually has low connection setup latency so I can't see this being a huge overhead.
With the above its now understood that both DistCp and S3DistCp can leverage a cluster's task nodes to import data from S3 onto HDFS quickly. I'm going to see how well these tools scale with a 21-node m3.xlarge cluster. This cluster will have 1 master node, 10 core nodes and 10 task nodes.
$ aws emr create-cluster \
--applications Name=Hadoop \
Name=Hive \
Name=Presto \
--auto-scaling-role EMR_AutoScaling_DefaultRole \
--ebs-root-volume-size 10 \
--ec2-attributes '{
"KeyName": "emr",
"InstanceProfile": "EMR_EC2_DefaultRole",
"AvailabilityZone": "eu-west-1c",
"EmrManagedSlaveSecurityGroup": "sg-89cd3eff",
"EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"}' \
--enable-debugging \
--instance-groups '[{
"InstanceCount": 1,
"BidPrice": "OnDemandPrice",
"InstanceGroupType": "MASTER",
"InstanceType": "m3.xlarge",
"Name": "Master - 1"
},{
"InstanceCount": 10,
"BidPrice": "OnDemandPrice",
"InstanceGroupType": "CORE",
"InstanceType": "m3.xlarge",
"Name": "Core - 2"
},{
"InstanceCount": 10,
"BidPrice": "OnDemandPrice",
"InstanceGroupType": "TASK",
"InstanceType": "m3.xlarge",
"Name": "Task - 3"
}]' \
--log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
--name 'My cluster' \
--region eu-west-1 \
--release-label emr-5.21.0 \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--service-role EMR_DefaultRole \
--termination-protected
With the new EMR cluster up and running I can SSH into it.
$ ssh -i ~/.ssh/emr.pem \
hadoop@ec2-54-78-53-9.eu-west-1.compute.amazonaws.com
Each core node on the HDFS cluster still has 68.95 GB of capacity but the ten machines combined create 689.49 GB of HDFS storage capacity.
$ hdfs dfsadmin -report \
| grep 'Configured Capacity'
Configured Capacity: 740336517120 (689.49 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
Configured Capacity: 74033651712 (68.95 GB)
I'll run S3DistCp first.
$ hdfs dfs -mkdir /orc
$ time (s3-dist-cp \
--src=s3://<bucket>/orc/ \
--dest=hdfs:///orc/)
The above completed in 4 minutes and 56 seconds. This is an improvement over the 11-node cluster but not the 2x improvement I was expecting.
Below is DistCp running on the 21-node cluster.
$ hdfs dfs -rm -r -skipTrash /orc
$ hdfs dfs -mkdir /orc
$ time (hadoop distcp s3://<bucket>/orc/* /orc)
The above completed in 4 minutes and 44 seconds.
The performance ratio between these two tools is more or less consistent between cluster sizes. Its a shame neither showed linear scaling with twice the number of core and task nodes.
Here is a recap of the transfer times seen in this post.
Duration | Transfer Method |
---|---|
27m40s | gohdfs, Sequentially |
15m57s | HDFS DFS CLI, Sequentially |
14m36s | HDFS DFS CLI, Concurrently x 16 |
14m17s | HDFS DFS CLI, Concurrently x 8 |
6m16s | Hadoop DistCp, 11-Node Cluster |
5m59s | S3DistCp, 11-Node Cluster |
4m56s | S3DistCp, 21-Node Cluster |
4m44s | Hadoop DistCp, 21-Node Cluster |
Why Use HDFS At All?
S3 is excellent for durability and doesn't suffer performance-wise if you have one cluster or ten clusters pointed at it. S3 also works as well as HDFS when appending records to a dataset. Both of the following queries will run without issue.
$ presto-cli \
--schema default \
--catalog hive
INSERT INTO trips_hdfs
SELECT *
FROM trips_hdfs
LIMIT 10;
INSERT INTO trips_s3
SELECT *
FROM trips_s3
LIMIT 10;
I've heard arguments that S3 is as fast as HDFS but I've never witnessed this in my time with both technologies. Below I'll run a benchmark on the 1.1 billion taxi trips. I'll have one table using S3-backed data and another table using HDFS-backed data. This was run on the 11-node cluster. I ran each query multiple times and recorded the fastest times.
$ hive
This is the HDFS-backed table.
CREATE EXTERNAL TABLE trips_hdfs (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION '/orc/';
This is the S3-backed table.
CREATE EXTERNAL TABLE trips_s3 (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION 's3://<bucket>/orc/';
I'll use Presto to run the benchmarks.
$ presto-cli \
--schema default \
--catalog hive
The following four queries were run on the HDFS-backed table.
The following completed in 6.77 seconds.
SELECT cab_type,
count(*)
FROM trips_hdfs
GROUP BY cab_type;
The following completed in 10.97 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_hdfs
GROUP BY passenger_count;
The following completed in 13.38 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_hdfs
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 19.82 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_hdfs
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
The following four queries were run on the S3-backed table.
The following completed in 10.82 seconds.
SELECT cab_type,
count(*)
FROM trips_s3
GROUP BY cab_type;
The following completed in 14.73 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips_s3
GROUP BY passenger_count;
The following completed in 19.19 seconds.
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_s3
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 24.61 seconds.
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_s3
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
This is a recap of the query times above.
HDFS on AWS EMR | AWS S3 | Speed Up | Query |
---|---|---|---|
6.77s | 10.82s | 1.6x | Query 1 |
10.97s | 14.73s | 1.34x | Query 2 |
13.38s | 19.19s | 1.43x | Query 3 |
19.82s | 24.61s | 1.24x | Query 4 |
The HDFS-backed queries were anywhere from 1.24x to 1.6x faster than the S3-backed queries.