On February 22nd Google announced that their managed Hadoop and Spark offering "Dataproc" was out of beta and generally available. In this blog post I'll load the metadata of 1.1 billion NYC taxi journeys into Google Cloud Storage and see how fast a Dataproc cluster of five machines can query that data using Facebook's Presto as the execution engine.
UPDATE: I've written a follow up post detailing how I achieved a 33x query speed increase on Dataproc.
Up and Running on Google Cloud
When you hit the landing page for Google Cloud there is an offer to sign up and receive $300 of credits towards usage of their platform. Once I got my account setup I created a project called "taxis" which they then turned into "taxis-1273" for uniqueness.
Google Cloud SDK Up and Running
To communicate with Google Cloud I'll install a few tools on a fresh installation of Ubuntu 14.04.3 LTS.
I'll first add Google Cloud's package distribution details.
$ export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)" $ echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | \ sudo tee /etc/apt/sources.list.d/google-cloud-sdk.list $ curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | \ sudo apt-key add - $ sudo apt update
The following will install, among others, the gcloud command.
$ sudo apt install git \ gcc \ google-cloud-sdk \ libffi-dev \ python-dev \ python-pip \ python-setuptools
I'll then initialise my environment. When this command was run I was given a URL to paste into a browser. The browser then asked that I login with my Google account before presenting me with a secret verification code to paste back into the CLI.
$ gcloud init
Welcome! This command will take you through the configuration of gcloud. Your current configuration has been set to: [default] To continue, you must log in. Would you like to log in (Y/n)? y Go to the following link in your browser: https://accounts.google.com/o/oauth2/auth?redirect_uri=...
Enter verification code: ... You are now logged in as: [mark@...]
After that I was asked to set a default project.
Pick cloud project to use:  [...]  [taxis-1273]
Please enter your numeric choice: 2
Your current project has been set to: [taxis-1273]. ... gcloud has now been configured!
104 GB of Data on Google Cloud Storage
There is a Python-based package called gsutil that can be used to create buckets on Google Cloud Storage and upload files. The following will install it via pip.
$ sudo pip install gsutil
I'll then authorise my account with this tool as well. It's the same process as above, you get given a long URL, you then paste it into a browser, login with your Google account and get a verification code back.
$ gsutil config
Please navigate your browser to the following URL: https://accounts.google.com/o/oauth2/auth?scope=...
Enter the authorization code: ...
Once authorised I can tell gsutil which project I'm working with.
What is your project-id? taxis-1273
With gsutil configured I'll create a bucket to store the 56 gzip files containing 1.1 billion denormalised records in CSV format. If you want to create a copy of this data please see the instructions in my Billion Taxi Rides in Redshift blog post.
$ gsutil mb \ -l EU \ gs://taxi-trips
The bucket will physically live in the EU. Google have a number of other locations to choose from if you want to pick one closer to you.
I've kept the gzip files stored in ~/taxi-trips/ on my machine. The following will upload them to the taxi-trips bucket on Google Cloud Storage using parallel uploads. When I ran this command I saw the 30 Mbit/s upload capacity I have available from my ISP was saturated for the duration of the upload.
$ screen $ gsutil \ -o GSUtil:parallel_composite_upload_threshold=150M \ -m cp \ ~/taxi-trips/*.csv.gz \ gs://taxi-trips/csv/
During the upload the files are broken up into ~60 MB chunks and stored in temporary folders in the bucket. Once uploaded, they are assembled and the files will then appear with their contents intact.
The following is what the upload process looked like in my terminal.
Copying file:///home/mark/taxi-trips/trips_xaa.csv.gz [Content-Type=text/csv]... Uploading ...util_help_cp/0f8cc0bfa8b4b45ef456ccf23c0cdf45_29: 60.32 MiB/60.32 MiB Uploading ...util_help_cp/0f8cc0bfa8b4b45ef456ccf23c0cdf45_15: 60.32 MiB/60.32 MiB ... Uploading ...sutil_help_cp/0f8cc0bfa8b4b45ef456ccf23c0cdf45_4: 60.32 MiB/60.32 MiB Uploading ...sutil_help_cp/0f8cc0bfa8b4b45ef456ccf23c0cdf45_0: 60.32 MiB/60.32 MiB
Launching a Dataproc Cluster
Google have provided example shell scripts that can be used to bootstrap Dataproc clusters. Below I'll clone their git repository, modify the Presto bootstrap script to use the latest release of Presto and then I'll upload that script to my storage bucket on Google Cloud. Note that Presto 0.144.1 is the latest release as of this writing but new versions tend to be released every two weeks.
$ git clone \ https://github.com/GoogleCloudPlatform/dataproc-initialization-actions.git $ sed -i "s/PRESTO_VERSION=\"0.126\"/PRESTO_VERSION=\"0.144.1\"/g" \ dataproc-initialization-actions/presto/presto.sh $ gsutil cp \ dataproc-initialization-actions/presto/presto.sh \ gs://taxi-trips/
I needed to create credentials for a service account and download those details as a JSON file. The file was stored as service_account.json in my home folder. I executed the following to activate the credentials:
$ gcloud auth activate-service-account \ --key-file service_account.json
Activated service account credentials for: [firstname.lastname@example.org]
I then upgraded my account from the free trial I was on so I could request quota increases.
I then requested a quota increase for the number of virtual CPU cores I could spin up. My account originally allowed for 8, I had this increased to 20.
I then synced my system's clock so that tokens passed between my machine and Google's servers would be signed with the correct timestamp.
$ sudo ntpdate ntp.ubuntu.com
With all that out of the way I could then spin up a 5-node Presto Cluster on Dataproc.
$ gcloud dataproc clusters \ create trips \ --zone europe-west1-b \ --master-machine-type n1-standard-4 \ --master-boot-disk-size 500 \ --num-workers 4 \ --worker-machine-type n1-standard-4 \ --worker-boot-disk-size 500 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --project taxis-1273 \ --initialization-actions 'gs://taxi-trips/presto.sh'
The above will create 5 machines, 4 will be worker nodes and the 5th will be the coordinator.
I've specified the bootstrap script I uploaded to Google Cloud Storage. If you run the above be sure to change the bucket name to one you own.
I've asked for all the machines to be the n1-standard-4 type with 500 GB of mechanical storage each. These come with 4 virtual CPUs and 15 GB of RAM each. It's arguable that the machines won't need 500 GB of space each so experimenting with smaller amounts of storage will make your budget go a bit further.
The cluster will be located in europe-west1-b so that it's close to the bucket where I uploaded the CSV files into.
The coordinator's name will be the name of the cluster ("trips") with a -m suffix. I executed the following to SSH into it.
$ gcloud compute ssh \ trips-m \ --zone europe-west1-b
A Billion Records in ORC Format
I've already uploaded 104 GB of CSV data gzip'ed into 56 files to Google Cloud Storage. But when working with Presto ORC-formatted data will perform much faster than data in CSV form. Below I created two tables in Hive. The first table represents the CSV data that lives in gs://taxi-trips/csv/. The second table represents where the ORC data will live.
CREATE EXTERNAL TABLE trips_csv ( trip_id INT, vendor_id VARCHAR(3), pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag VARCHAR(1), rate_code_id SMALLINT, pickup_longitude DECIMAL(18,14), pickup_latitude DECIMAL(18,14), dropoff_longitude DECIMAL(18,14), dropoff_latitude DECIMAL(18,14), passenger_count SMALLINT, trip_distance DECIMAL(6,3), fare_amount DECIMAL(6,2), extra DECIMAL(6,2), mta_tax DECIMAL(6,2), tip_amount DECIMAL(6,2), tolls_amount DECIMAL(6,2), ehail_fee DECIMAL(6,2), improvement_surcharge DECIMAL(6,2), total_amount DECIMAL(6,2), payment_type VARCHAR(3), trip_type SMALLINT, pickup VARCHAR(50), dropoff VARCHAR(50), cab_type VARCHAR(6), precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel VARCHAR(10), pickup_borocode SMALLINT, pickup_boroname VARCHAR(13), pickup_ct2010 VARCHAR(6), pickup_boroct2010 VARCHAR(7), pickup_cdeligibil VARCHAR(1), pickup_ntacode VARCHAR(4), pickup_ntaname VARCHAR(56), pickup_puma VARCHAR(4), dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel VARCHAR(10), dropoff_borocode SMALLINT, dropoff_boroname VARCHAR(13), dropoff_ct2010 VARCHAR(6), dropoff_boroct2010 VARCHAR(7), dropoff_cdeligibil VARCHAR(1), dropoff_ntacode VARCHAR(4), dropoff_ntaname VARCHAR(56), dropoff_puma VARCHAR(4) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://taxi-trips/csv/'; CREATE EXTERNAL TABLE trips_orc ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION 'gs://taxi-trips/orc/';
I'll then issue a command to load all the CSV files into the trips_csv table (the data already exists so no files will be touched) and then load all that data into the trips_orc table. When that data is loaded in files will appear in gs://taxi-trips/orc on Google Cloud Storage.
echo "LOAD DATA INPATH 'gs://taxi-trips/csv' INTO TABLE trips_csv; INSERT INTO TABLE trips_orc SELECT * FROM trips_csv;" | hive
The above took 3 hours to complete.
This is a one-off exercise. From now on I can just create the trips_orc table and point it to gs://taxi-trips/orc/ when I need to interact with that data.
The bootstrap script named the CLI tool for Presto simply "presto" instead of "presto-cli" (as seen on other setups).
$ presto \ --catalog hive \ --schema default
The following completed in 8 minutes and 46 seconds.
SELECT cab_type, count(*) FROM trips_orc GROUP BY cab_type;
Query 20160411_125559_00002_3nhch, FINISHED, 4 nodes Splits: 1,409 total, 1,409 done (100.00%) 8:46 [1.11B rows, 234MB] [2.12M rows/s, 457KB/s]
The following completed in 8 minutes and 43 seconds.
SELECT passenger_count, avg(total_amount) FROM trips_orc GROUP BY passenger_count;
Query 20160411_130501_00003_3nhch, FINISHED, 4 nodes Splits: 1,409 total, 1,409 done (100.00%) 8:43 [1.11B rows, 3.89GB] [2.13M rows/s, 7.61MB/s]
The following completed in 8 minutes and 31 seconds.
SELECT passenger_count, year(pickup_datetime), count(*) FROM trips_orc GROUP BY passenger_count, year(pickup_datetime);
Query 20160411_131359_00004_3nhch, FINISHED, 4 nodes Splits: 1,409 total, 1,409 done (100.00%) 8:31 [1.11B rows, 6.9GB] [2.18M rows/s, 13.8MB/s]
The following completed in 8 minutes and 26 seconds.
SELECT passenger_count, year(pickup_datetime) trip_year, round(trip_distance), count(*) trips FROM trips_orc GROUP BY passenger_count, year(pickup_datetime), round(trip_distance) ORDER BY trip_year, trips desc;
Query 20160411_132242_00005_3nhch, FINISHED, 4 nodes Splits: 1,409 total, 1,409 done (100.00%) 8:26 [1.11B rows, 9.24GB] [2.2M rows/s, 18.7MB/s]
I'm assuming some of the configuration used in the bootstrap scripts might have had an effect on the query durations. At some point I'm going to see if I can piece together a configuration that is better balanced for the n1-standard-4 machines.
The data that lives on Google Cloud Storage will probably eat up the better part of 200 GB so that will set me back $5.20 / month.
The cluster itself is decoupled from the storage bucket and only needs to be launched when I need to run queries. Therefore I can look at its costs on an ad-hoc basis. Each time I launch a 5-node cluster with the above machine types in Europe I should be billed $1 an hour for the compute power needed for the coordinator and worker nodes. The 2,500 GB of mechanical disk space across the 5 nodes comes to $0.14 / hour. There will also be a $0.20 / hour service fee for using Dataproc. So the total hourly cost for the compute resources should come out to $1.14.
Google Cloud also has spot instances that they call "preemptibles", these can bring the compute costs down by 70%.
The cluster billing granularity is by the minute so if you only use the cluster for an hour and 15 minutes, you'll only be billed for an hour and 15 minutes. There is a 10-minute minimum charge though.