This blog post will cover how I took a billion+ records containing six years of taxi ride metadata in New York City and analysed them using Spark SQL on Amazon EMR. I stored the data on S3 instead of HDFS so that I could launch EMR clusters only when I need them while only paying a few dollars a month to permanently store the data on S3.
The links in this blog post pointing to the AWS console use the eu-west-1 region. This may or may not be the most suitable location for you. You can switch the region by clicking the drop down that reads "Ireland" in the top right of the console.
AWS CLI Up and Running
All the following commands were run on a fresh install of Ubuntu 14.04.3.
To start, I'll install the AWS CLI tool and a few dependencies it needs to run.
$ sudo apt update
$ sudo apt install \
python-pip \
python-virtualenv
EMR's CLI endpoint gets a fair amount of maintenance so ensure you're running at least version 1.10.14 of the AWS CLI tool.
$ virtualenv amazon
$ source amazon/bin/activate
$ pip install --upgrade \
https://github.com/aws/aws-cli/archive/1.10.14.zip
The following will capture AWS credentials used by the AWS CLI tool.
$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY
Launching the Spark Cluster on EMR
The following will launch a 5-node cluster, there will be one master node, 2 core nodes and a 2 task nodes. I'll only install Hive and Spark on this cluster.
If you haven't launched an EMR cluster before then I suggest doing so via the AWS Console. Make sure you turn on advanced options so you can pick spot instances instead of on-demand instances for your core and task nodes. When you use the EMR wizard it'll create the roles and security groups needed for your cluster nodes to communicate with one another.
Before you run the command below generate a key pair called "emr". Download and store the emr.pem file in the ~/.ssh/ folder on your system. This file is the SSH private key that lets you access the Hadoop cluster nodes via SSH.
Below you will need to change a few items before executing the create-cluster command:
- If the key name you generated was called anything other than emr then change the KeyName attribute.
- The InstanceProfile is set to the name the wizard automatically generated. If you haven't used the wizard before then make sure this value matches your role.
- The --region and AvailabilityZone parameters are set to eu-west-1a, change this if this region isn't the most appropriate for you. This region should match the region where you've stored the denormalised CSV data on S3.
- The log-uri parameter's bucket name needs to be changed to another bucket name that hasn't already been taken.
- The BidPrice amounts are in US Dollars and were appropriate at the time I was bidding for them. Please see the current spot prices for m3.xlarge instances in your region and adjust these accordingly. There is a pricing history button on the EC2 Spot Requests page.
- You might want to consider adding more task nodes as these are the only nodes doing any serious work when you're queries are executing via Spark SQL.
- EMR release 4.3.0 is being used. Amazon released 4.4.0 on March 14th and I haven't yet had a chance to try it out. Consider using the latest EMR release and keep an eye on Jeff Barr's EMR blog posts where he announces new releases.
$ aws emr create-cluster \
--applications \
Name=Hive \
Name=Spark \
--ec2-attributes '{
"KeyName": "emr",
"InstanceProfile": "EMR_EC2_DefaultRole",
"AvailabilityZone": "eu-west-1a",
"EmrManagedSlaveSecurityGroup": "sg-89cd3eff",
"EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"
}' \
--release-label emr-4.3.0 \
--service-role EMR_DefaultRole \
--log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
--name 'A Billion Taxi Trips' \
--instance-groups '[{
"InstanceCount": 2,
"BidPrice": "0.048",
"InstanceGroupType": "CORE",
"InstanceType": "m3.xlarge",
"Name": "Core instance group - 2"
}, {
"InstanceCount": 2,
"BidPrice": "0.048",
"InstanceGroupType": "TASK",
"InstanceType": "m3.xlarge",
"Name": "Task instance group - 3"
}, {
"InstanceCount": 1,
"InstanceGroupType": "MASTER",
"InstanceType": "m3.xlarge",
"Name": "Master instance group - 1"
}]' \
--region eu-west-1
Once that has executed make sure you add access for your IP address on port 22 to the ElasticMapReduce-master security group so that you can SSH into the master node.
The cluster can take 20 - 30 minutes to finish provisioning and bootstrapping all of the nodes so keep an eye on its status in the EMR console.
Creating Tables in the Hive Metastore
To access your master node change the EC2 hostname below to the hostname of your master node.
$ ssh -i ~/.ssh/emr.pem \
hadoop@ec2-54-170-20-13.eu-west-1.compute.amazonaws.com
The first command to run will fix the permissions on the hive logs folder.
$ sudo su -c 'mkdir -p /var/log/hive/user/hadoop &&
touch /var/log/hive/user/hadoop/hive.log &&
chown -R hadoop /var/log/hive/user/hadoop'
In my Billion Taxi Rides in Redshift blog post I took the raw taxi trip metadata and created denormalised CSV files that were gzip compressed.
In my Billion Taxi Rides on Amazon EMR running Presto blog post I turned that CSV data into ORC files and stored them on S3. The following shows that data as it currently lives on S3.
$ aws s3 ls <s3_bucket>/orc/
2016-03-14 13:54:41 398631347 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000000
2016-03-14 13:54:40 393489828 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000001
2016-03-14 13:54:40 339863956 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000002
....
2016-03-14 14:00:26 266048453 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000189
2016-03-14 14:00:23 265076102 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000190
2016-03-14 14:00:12 115110402 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000191
Spark SQL doesn't interact with ORC files as fast as it can interact with Parquet files so below I'll create tables representing the data in both ORC and Parquet format and then copy the data from the ORC table into the Parquet table.
The tables are created with the EXTERNAL attribute which means they don't expect to take full control of the files they represent. If the trips_orc table was declared without the EXTERNAL attribute it would wipe out any existing data in the s3://<s3_bucket>/orc/ folder.
But with the EXTERNAL attribute the trips_orc table will discover the existing files in the s3://<s3_bucket>/orc/ folder and the trips_parquet table will start to store data in the s3://<s3_bucket>/parquet/ folder even though I haven't created that folder beforehand.
$ hive
CREATE EXTERNAL TABLE trips_orc (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS orc
LOCATION 's3://<s3_bucket>/orc/';
CREATE EXTERNAL TABLE trips_parquet (
trip_id INT,
vendor_id STRING,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag STRING,
rate_code_id SMALLINT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count SMALLINT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
payment_type STRING,
trip_type SMALLINT,
pickup STRING,
dropoff STRING,
cab_type STRING,
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel STRING,
pickup_borocode SMALLINT,
pickup_boroname STRING,
pickup_ct2010 STRING,
pickup_boroct2010 STRING,
pickup_cdeligibil STRING,
pickup_ntacode STRING,
pickup_ntaname STRING,
pickup_puma STRING,
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel STRING,
dropoff_borocode SMALLINT,
dropoff_boroname STRING,
dropoff_ct2010 STRING,
dropoff_boroct2010 STRING,
dropoff_cdeligibil STRING,
dropoff_ntacode STRING,
dropoff_ntaname STRING,
dropoff_puma STRING
) STORED AS parquet
LOCATION 's3://<s3_bucket>/parquet/';
1.1 Billion Trips in Parquet Format
The following will copy all the data in the trips_orc table into the trips_parquet. This process should take around 2 hours but could be quicker if you add more task nodes to your cluster.
$ screen
$ echo "INSERT INTO TABLE trips_parquet
SELECT * FROM trips_orc;" | hive
Benchmarking Queries in Spark SQL
Make sure Spark is using at least 8 GB of memory when you launch it. The fourth query executed in the benchmark will run out of heap space with the amount of memory specified in the stock EMR 4.3.0 configuration settings.
$ SPARK_MEM=${SPARK_MEM:-8196m}
$ export SPARK_MEM
$ JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
The following will launch the Spark SQL CLI interface.
$ spark-sql
I was surprised how quickly Spark SQL could query the data. Unlike Presto, which could use both core and task nodes to complete queries, Spark only used the task nodes to do all the heavy lifting.
Though Spark can do analytics its main strengths seem to be in running machine learning code, graph analysis and other recursive and memory-hungry workloads. So it might not be fair comparing results of the Presto benchmarks I did in the Presto on EMR blog post. Nonetheless it's interesting to see such differences in performance.
The following completed in 4 minutes and 24 seconds (3.5x slower than Presto).
SELECT cab_type,
count(*)
FROM trips_parquet
GROUP BY cab_type;
The following completed in 5 minutes and 13 seconds (5x slower than Presto).
SELECT passenger_count,
avg(total_amount)
FROM trips_parquet
GROUP BY passenger_count;
The following completed in 10 minutes and 20 seconds (7.5x slower than Presto).
SELECT passenger_count,
year(pickup_datetime),
count(*)
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime);
The following completed in 16 minutes and 1 second (13x slower than Presto).
SELECT passenger_count,
year(pickup_datetime) trip_year,
round(trip_distance),
count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
year(pickup_datetime),
round(trip_distance)
ORDER BY trip_year,
trips desc;
Spark's ORC performance leaves a lot to be desired. I ran the first two benchmark queries on the trips_orc table and got back results that took 7 - 8x longer to return then their Parquet counterparts. $0.485 / hour for a Spark cluster might not be a lot of money to pay but paying $0.485 for a query that takes the better part of an hour to complete feels like a lot. I can see how ORC is Presto's favourite file format and how Parquet is Spark's favourite.
The following completed in 33 minutes and 50 seconds (27x slower than Presto).
SELECT cab_type,
count(*)
FROM trips_orc
GROUP BY cab_type;
The following completed in 44 minutes and 29 seconds (40x slower than Presto).
SELECT passenger_count,
avg(total_amount)
FROM trips_orc
GROUP BY passenger_count;
I'm interested in learning of any optimisations that can bring Spark's analytical performance in line with Presto's. Please drop me a line if you've got any configuration settings to share.