Posted on Sat 13 February 2016

A Billion Taxi Rides in Redshift

On November 17th, 2015, Todd W Schneider published a blog post titled Analyzing 1.1 Billion NYC Taxi and Uber Trips, with a Vengeance in which he analysed the metadata of 1.1 billion Taxi journeys made in New York City between 2009 and 2015. Included with this work was a link to a GitHub repository where he published the SQL, Shell and R files he used in his work and instructions on how to get everything up and running.

There was a question in the README.md file that struck me: "Why not use BigQuery or Redshift?" to which Todd replied, "Google BigQuery and Amazon Redshift would probably provide significant performance improvements over PostgreSQL."

Truth is, outside of geospatial-specific queries, many columnar-based store engines would be a benefit to this dataset in terms of query performance. In this blog post I'll look at getting the raw data from it's original sources, denormalising it and importing it into a Redshift Data Warehouse on AWS.

UPDATE: This blog post uses Redshift's free tier instance type which doesn't have enough capacity to store the entire dataset. I've since published a blog post where I use a ds2.xlarge instance which can store all 1.1 billion records.

Your Disk Drive is the Bottleneck

The high water mark of disk space usage for the data alone in this exercise is ~600 GB. If your going to run anything in this blog post on your machine I recommend doing so on an SSD drive. If you don't have an SSD drive with 640 GB of space available I'd suggest launching an EC2 instance on AWS with 640 GB of SSD-backed storage or launch a 640 GB droplet on Digital Ocean.

Why Not Skip PostgreSQL & Import the Raw Data Directly?

Todd's scripts pull 6 datasets, each with their own schemas and in some cases, data which needs to be cleaned up and transformed before it can be of use. Redshift isn't a great place to do transformations on data, it really should be cleaned up before it arrives.

Gathering 190 GB of Data

The following was done on a fresh install of Ubuntu 14.04.3 LTS with 1 TB of drive space.

The first thing I need to do is install PostgreSQL 9.3, Postgis 2.1, git and unzip.

$ sudo apt-get update
$ sudo apt-get install \
    git \
    postgresql-9.3-postgis-2.1 \
    unzip

With those in place I can clone Todd's repo and begin to download the raw data. I've grouped all the URLs into a single call and use 6 concurrent instances of wget to download the data.

$ git clone https://github.com/toddwschneider/nyc-taxi-data.git
$ cd nyc-taxi-data
$ cat raw_uber_data_urls.txt \
      raw_data_urls.txt | \
      xargs -n 1 -P 6 wget -P data/ &

The data takes ~190 GB of space so you'll want to run this on a fast internet connection with an equally fast disk drive to store it all on. I spent my Saturday afternoon mostly looking at the following:

yellow_tripdata_2010-02.csv  60%[=========================>                  ]   1.15G  3.53MB/s   eta 4m 11s
yellow_tripdata_2010-03.csv  40%[================>                           ] 908.90M  3.27MB/s   eta 7m 9s
yellow_tripdata_2010-04.csv  34%[==============>                             ] 905.32M  3.01MB/s   eta 9m 9s
yellow_tripdata_2010-05.csv  30%[============>                               ] 813.28M  2.35MB/s   eta 9m
...

Importing 190 GB of Data into PostgreSQL

If you haven't assigned permissions to your Linux user account in your PostgreSQL installation then you'll need to do so. Replace 'mark' below in the two spots it appears in with your user name (run whoami to find out what your user name is).

$ sudo su - postgres -c \
    "psql -c 'CREATE USER mark; ALTER USER mark WITH SUPERUSER;'"

To start the import process you'll need to initialize the database and then begin importing the trip data for the Green and Yellow Taxi datasets.

$ ./initialize_database.sh
$ ./import_trip_data.sh

I ran the import process on a mechanical drive and it took 72 hours to import. I'd estimate 10% of the import process was bottlenecked by my CPU while indices were being created and the remaining 90% was bottlenecked by having both the database and raw data sat on the same drive and having an import process suffer from slow, random I/O.

The following will import the 2014 and 2015 data for Uber:

$ ./import_uber_trip_data.sh

Todd's repo comes with scripts that wrap up a lot of ETL complexity and they work very well out of the box. I only came across two issues when importing the data into PostgreSQL (one issue was resolved prior to publication of this blog post). During the import phase for the Uber 2014 data I came across the following error message:

ERROR:  date/time field value out of range: "4/13/2014 0:01:00"
HINT:  Perhaps you need a different "datestyle" setting.
CONTEXT:  COPY uber_trips_staging, line 15076, column pickup_datetime: "4/13/2014 0:01:00"

This same issue came up with every file in the Uber 2014 dataset but it appears to only have affected a minority of records. I raised an issue regarding this, hopefully there is a transformation process that can clean this data up.

When this process is complete you should have 321 GB of drive space taken up by your PostgreSQL database.

$ sudo su - postgres -c "du -hs ~/9.3/main"
321G    /var/lib/postgresql/9.3/main

PostgreSQL should also be reporting row counts that come close to the following:

$ psql nyc-taxi-data
SELECT relname table_name,
       lpad(to_char(reltuples, 'FM9,999,999,999'), 13) row_count
FROM pg_class
LEFT JOIN pg_namespace
    ON (pg_namespace.oid = pg_class.relnamespace)
WHERE nspname NOT IN ('pg_catalog', 'information_schema')
AND relkind = 'r'
ORDER BY reltuples DESC;
              table_name               |   row_count
---------------------------------------+---------------
 trips                                 | 1,120,559,744
 uber_trips_2015                       |    14,270,497
 spatial_ref_sys                       |         3,911
 central_park_weather_observations_raw |         2,372
 nyct2010                              |         2,167
 uber_taxi_zone_lookups                |           265
 yellow_tripdata_staging               |             0
 cab_types                             |             0
 uber_trips_staging                    |             0
 green_tripdata_staging                |             0

Keep in mind these row counts are estimates. Anything less than a SELECT COUNT(*) FROM <table> will be less than authoritative for getting row counts from PostgreSQL. The reason why I didn't just run a count on the trips table is that PostgreSQL needs to do a sequential scan which very slow when looking at a billion+ records on a mechanical drive.

What's up with the 14M Uber Records for 2015?

The number of properties differ so greatly between the 2014 and 2015 dumps of data from Uber that Todd decided the 2015 data gets to go live in it's own table.

There is enough information missing from the 2015 dump that I've decided to not include it in the denormalising process.

If you wish, this data can be exported separately at a later point and imported into your data store of choice.

Exporting 1.1 Billion Rows

I will now create a file in which PostgreSQL can dump the results of the denormalising query into in CSV format. I want to break the 1.1 billion rows up into 2GB, gzip-compressed files. These should be the optimal size for loading into Redshift in parallel. If you look at this example line I exported (by appending LIMIT 1 to the query below) you'll see it's 484 bytes long.

27767932,CMT,2009-01-04 04:02:25,2009-01-04 04:09:59,,,-73.991838999999999,40.722442999999998,-73.998007000000001,40.740388000000003,1,1.8999999999999999,7.7999999999999998,0,,20,0,,,27.800000000000001,Credit,,0101000020E61000006EF9484A7A7F52C0B01C2103795C4440,0101000020E6100000768BC058DF7F52C0E92CB308C55E4440,yellow,0,0,0,56,-39,34,1473,36.01,1,Manhattan,003601,1003601,E,MN27,Chinatown,3809,1165,81,1,Manhattan,008100,1008100,I,MN13,Hudson Yards-Chelsea-Flatiron-Union Square,3807

If that's the average length of a row when in CSV format then the whole dump should produce around ~500 GB of raw data. If gzip offers us a 5:1 compression ratio then ~500 GB of raw data should be 100 GB when compressed. I can take 1.1 billion row count and divide by 50 and that will give me 22 million lines per 2 GB gzip file. If I round that down a bit in order to not expect too much from the compression ratio that leaves me with 20 million lines per CSV creating ~50 - 60 x 2 GB gzip files.

Before running the dump consider taking the 190 GB of raw data in the nyc-taxi-data/data folder off your drive in order to ensure you don't run out of space while exporting this data. The 190 GB of raw data is likely larger than the compressed, denormalised data that's being exported and not having it on your working drive means you shouldn't have to worry about drive capacity at this point.

Before I export from PostgreSQL I'll create a folder that PostgreSQL has access to where it'll store the gzip files.

$ mkdir -p /home/mark/nyc-taxi-data/trips
$ sudo chown -R postgres:postgres \
    /home/mark/nyc-taxi-data/trips

I'll then connect to PostgreSQL:

$ psql nyc-taxi-data

Replace the /home/mark/nyc-taxi-data/trips path at the bottom of the query below with the complete path you've created. Remember that PostgreSQL demands the output path be absolute and not relative.

COPY (
    SELECT trips.id,
           trips.vendor_id,
           trips.pickup_datetime,
           trips.dropoff_datetime,
           trips.store_and_fwd_flag,
           trips.rate_code_id,
           trips.pickup_longitude,
           trips.pickup_latitude,
           trips.dropoff_longitude,
           trips.dropoff_latitude,
           trips.passenger_count,
           trips.trip_distance,
           trips.fare_amount,
           trips.extra,
           trips.mta_tax,
           trips.tip_amount,
           trips.tolls_amount,
           trips.ehail_fee,
           trips.improvement_surcharge,
           trips.total_amount,
           trips.payment_type,
           trips.trip_type,
           trips.pickup,
           trips.dropoff,

           cab_types.type cab_type,

           weather.precipitation_tenths_of_mm rain,
           weather.snow_depth_mm,
           weather.snowfall_mm,
           weather.max_temperature_tenths_degrees_celsius max_temp,
           weather.min_temperature_tenths_degrees_celsius min_temp,
           weather.average_wind_speed_tenths_of_meters_per_second wind,

           pick_up.gid pickup_nyct2010_gid,
           pick_up.ctlabel pickup_ctlabel,
           pick_up.borocode pickup_borocode,
           pick_up.boroname pickup_boroname,
           pick_up.ct2010 pickup_ct2010,
           pick_up.boroct2010 pickup_boroct2010,
           pick_up.cdeligibil pickup_cdeligibil,
           pick_up.ntacode pickup_ntacode,
           pick_up.ntaname pickup_ntaname,
           pick_up.puma pickup_puma,

           drop_off.gid dropoff_nyct2010_gid,
           drop_off.ctlabel dropoff_ctlabel,
           drop_off.borocode dropoff_borocode,
           drop_off.boroname dropoff_boroname,
           drop_off.ct2010 dropoff_ct2010,
           drop_off.boroct2010 dropoff_boroct2010,
           drop_off.cdeligibil dropoff_cdeligibil,
           drop_off.ntacode dropoff_ntacode,
           drop_off.ntaname dropoff_ntaname,
           drop_off.puma dropoff_puma
    FROM trips
    LEFT JOIN cab_types
        ON trips.cab_type_id = cab_types.id
    LEFT JOIN central_park_weather_observations_raw weather
        ON weather.date = trips.pickup_datetime::date
    LEFT JOIN nyct2010 pick_up
        ON pick_up.gid = trips.pickup_nyct2010_gid
    LEFT JOIN nyct2010 drop_off
        ON drop_off.gid = trips.dropoff_nyct2010_gid
) TO PROGRAM
    'split -l 20000000 --filter="gzip > /home/mark/nyc-taxi-data/trips/trips_\$FILE.csv.gz"'
    WITH CSV;

When this completes you'll have your first accurate count of the number of records in the trips table printed out:

COPY 1113653018

I'll set the ownership of the trips folder back to my user so I can work with it without restrictions.

$ sudo chown -R mark:mark /home/mark/nyc-taxi-data/trips

Uploading in Parallel to S3

I now want to upload the gzip files to AWS S3. There are a number of utilities I'll install to make this possible.

$ sudo apt-get update
$ sudo apt-get install \
    python-pip \
    python-virtualenv \
    parallel
$ virtualenv redshift
$ source redshift/bin/activate

$ pip install \
    awscli \
    https://github.com/s3tools/s3cmd/archive/v1.6.1.zip#egg=s3cmd

Make sure the AWS S3 bucket you upload to is in the same region you want to launch your Redshift instance in.

$ s3cmd --configure
...
Default Region [US]: US
...

This will create a new S3 bucket I can use to upload the data to.

$ s3cmd mb s3://trips_metadata_example
Bucket 's3://trips_metadata_example/' created

This will upload all the gzip files 8 at a time.

$ cd trips
$ find trips_x[a-c]*.csv.gz | \
    parallel -j8 \
    s3cmd put {/} s3://trips_metadata_example/ \
        --progress

If you average 1 MB/s in upload speeds to S3 it'll take around 30 hours to upload all ~105 GB of data.

Redshift needs a manifest of the files we'll be loading in.

$ vi trips.manifest
{
    "entries": [
        {"url": "s3://trips_metadata_example/trips_xaa.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xab.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xac.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xad.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xae.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xaf.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xag.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xah.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xai.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xaj.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xak.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xal.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xam.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xan.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xao.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xap.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xaq.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xar.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xas.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xat.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xau.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xav.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xaw.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xax.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xay.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xaz.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xba.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbb.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbc.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbd.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbe.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbf.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbg.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbh.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbi.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbj.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbk.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbl.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbm.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbn.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbo.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbp.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbq.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbr.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbs.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbt.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbu.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbv.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbw.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbx.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xby.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xbz.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xca.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xcb.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xcc.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xcd.csv.gz", "mandatory": true}
    ]
}

The manifest itself will also need to live on S3.

$ s3cmd put trips.manifest s3://trips_metadata_example/

Sizing up a Redshift Cluster

When you first enter the Redshift section of the AWS console you may see the following:

If you've never created an Amazon Redshift cluster, you're eligible for a two month free trial of our dc1.large node. You get 750 hours per month for free, enough hours to continuously run one dc1.large node with 160GB of compressed SSD storage.

So cost shouldn't be much of a worry if you're just trying this out for the first time.

A Redshift Cluster can have anywhere from 1 to 128 compute nodes. Each of these nodes have anywhere from 2 to 16 slices. Each slice allows Redshift to ingest up to ~6.5 MB/s of data. So the more nodes you have and the more slices per node you have the faster you can ingest data into Redshift.

If we go with the free, single-node dc1.large cluster type we should expect that a single 2GB gzip file will load into the trips table in about 5 - 6 minutes. If that rate is sustained for the whole dataset it should finish in about 6 - 8 hours.

Loading 1/5th of a Billion Records into Redshift

The only down side with the dc1.large cluster type is that it will only have enough drive space to load around 200M records. The cluster needs free space of at least 2.5x (minimum, not maximum) the incoming data size to use as temporary storage space for sorting the data in each table. When we import 10 gzip files containing 200M records, at one point we will use 95% of the drive capacity available. When the import is complete the disk usage will drop down to 22%. It is possible to break the import up into small jobs that run in sequence instead of in parallel in which case you should be able to fit more data into the a dc1.large cluster.

The cheapest, single-node cluster type that can accept ~500 GB of uncompressed data and sort it will be the ds2.xlarge which has 2 TB of mechanical storage and runs for $0.85 / hour on us-east-1. To contrast, the single-node, dc1.8xlarge has 2.56 TB of SSD-backed storage and runs for $4.80 / hour in the same region.

Redshift is I/O-bound and SSDs can often be 5-20x faster than mechanical drives. With that in mind consider if the $3.95 / hour savings is worth more than your Analyst's time.

For the purposes of keeping costs at $0 and time boxing this blog writting exercise to a week of my time I'll be loading 200M records into a dc1.large cluster type.

Redshift Up and Running

Below I'll set the environment variables for my AWS access credentials and set the master username and password for my Redshift instance.

$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY
$ read MASTER_USERNAME
$ read MASTER_PASSWORD
$ export MASTER_USERNAME
$ export MASTER_PASSWORD

Before creating the Redshift cluster make sure the AWS CLI tool's default region is the same region your S3 bucket is located in. You can run the configure command to update any settings and ensure they're as you expect.

$ aws configure
...
Default region name [None]: us-east-1
...

The following will launch a single-node, dc1.large data warehouse with 7 EC2 compute units, 15GB of RAM and 160GB of SSD-backed storage.

$ aws redshift create-cluster \
    --db-name trips \
    --cluster-type single-node \
    --node-type dc1.large \
    --master-username $MASTER_USERNAME \
    --master-user-password $MASTER_PASSWORD \
    --publicly-accessible \
    --cluster-identifier trips-data \
    --availability-zone us-east-1a

This will create a Redshift cluster with your default security group. To be able to connect to Redshift, add your IP address you're connecting to Redshift from to your default security group.

It should take a few minutes for the cluster to become available. The following will refresh the status of the cluster every ten seconds. When you see the status as available you should be able to connect to it.

$ watch -n10 aws redshift describe-clusters

You can use the PostgreSQL interactive terminal to connect to Redshift.

$ PGPASSWORD=$MASTER_PASSWORD \
    psql -h trips-data.cttuaolixpsz.us-east-1.redshift.amazonaws.com \
         -p 5439 \
         -U $MASTER_USERNAME trips

The following will create the table for the denormalised trips data.

CREATE TABLE trips (
    trip_id                 INTEGER NOT NULL DISTKEY ENCODE LZO,
    vendor_id               VARCHAR(3) ENCODE LZO,

    -- Sort keys shouldn't be encoded (compressed)
    pickup_datetime         TIMESTAMP NOT NULL,

    dropoff_datetime        TIMESTAMP NOT NULL ENCODE LZO,
    store_and_fwd_flag      VARCHAR(1) ENCODE RUNLENGTH,
    rate_code_id            SMALLINT NOT NULL ENCODE LZO,
    pickup_longitude        DECIMAL(18,14) ENCODE MOSTLY8,
    pickup_latitude         DECIMAL(18,14) ENCODE MOSTLY8,
    dropoff_longitude       DECIMAL(18,14) ENCODE MOSTLY8,
    dropoff_latitude        DECIMAL(18,14) ENCODE MOSTLY8,
    passenger_count         SMALLINT NOT NULL DEFAULT '0' ENCODE LZO,
    trip_distance           DECIMAL(6,3) DEFAULT '0.0' ENCODE MOSTLY8,
    fare_amount             DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    extra                   DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    mta_tax                 DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    tip_amount              DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    tolls_amount            DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    ehail_fee               DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    improvement_surcharge   DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    total_amount            DECIMAL(6,2) DEFAULT '0.0' ENCODE MOSTLY8,
    payment_type            VARCHAR(3) ENCODE RUNLENGTH,
    trip_type               SMALLINT ENCODE LZO,
    pickup                  VARCHAR(50) ENCODE LZO,
    dropoff                 VARCHAR(50) ENCODE LZO,

    cab_type                VARCHAR(6) NOT NULL ENCODE LZO,

    precipitation           SMALLINT DEFAULT '0' ENCODE LZO,
    snow_depth              SMALLINT DEFAULT '0' ENCODE LZO,
    snowfall                SMALLINT DEFAULT '0' ENCODE LZO,
    max_temperature         SMALLINT DEFAULT '0' ENCODE LZO,
    min_temperature         SMALLINT DEFAULT '0' ENCODE LZO,
    average_wind_speed      SMALLINT DEFAULT '0' ENCODE LZO,

    pickup_nyct2010_gid     SMALLINT ENCODE LZO,
    pickup_ctlabel          VARCHAR(10) ENCODE LZO,
    pickup_borocode         SMALLINT ENCODE LZO,
    pickup_boroname         VARCHAR(13) ENCODE LZO,
    pickup_ct2010           VARCHAR(6) ENCODE LZO,
    pickup_boroct2010       VARCHAR(7) ENCODE LZO,
    pickup_cdeligibil       VARCHAR(1) ENCODE RUNLENGTH,
    pickup_ntacode          VARCHAR(4) ENCODE LZO,
    pickup_ntaname          VARCHAR(56) ENCODE LZO,
    pickup_puma             VARCHAR(4) ENCODE LZO,

    dropoff_nyct2010_gid    SMALLINT ENCODE LZO,
    dropoff_ctlabel         VARCHAR(10) ENCODE LZO,
    dropoff_borocode        SMALLINT ENCODE LZO,
    dropoff_boroname        VARCHAR(13) ENCODE LZO,
    dropoff_ct2010          VARCHAR(6) ENCODE LZO,
    dropoff_boroct2010      VARCHAR(7) ENCODE LZO,
    dropoff_cdeligibil      VARCHAR(1) ENCODE RUNLENGTH,
    dropoff_ntacode         VARCHAR(4) ENCODE LZO,
    dropoff_ntaname         VARCHAR(56) ENCODE LZO,
    dropoff_puma            VARCHAR(4) ENCODE LZO,

    primary key(trip_id)
) sortkey(pickup_datetime);

Loading Records into Redshift

For demonstration purposes I'll be running a single load into Redshift. Because I'll need 95% of the drive capacity during the import process I'll just be importing the first 10 gzip files. If you wish to import all the gzip files in parallel you will need a larger cluster type as discussed above. I've amended my trips.manifest file with the following contents and uploaded it to S3.

{
    "entries": [
        {"url": "s3://trips_metadata_example/trips_xaa.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xab.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xac.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xad.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xae.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xaf.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xag.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xah.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xai.csv.gz", "mandatory": true},
        {"url": "s3://trips_metadata_example/trips_xaj.csv.gz", "mandatory": true}
    ]
}
$ s3cmd put trips.manifest s3://trips_metadata_example/

I'll now execute the copy command which will begin loading in the data from the 10 gzip files. Be sure to add in your access and secret keys to the fourth line in the SQL command below.

$ PGPASSWORD=$MASTER_PASSWORD \
    psql -h trips-data.cttuaolixpsz.us-east-1.redshift.amazonaws.com \
         -p 5439 \
         -U $MASTER_USERNAME trips
COPY trips
  FROM 's3://trips_metadata_example/trips.manifest'
  CREDENTIALS
    'aws_access_key_id=...;aws_secret_access_key=...'
  DELIMITER ','
  EMPTYASNULL
  ESCAPE
  GZIP
  MANIFEST
  MAXERROR 10
  REMOVEQUOTES
  TRIMBLANKS
  TRUNCATECOLUMNS;

Metrics from the Import

The 200M records loaded in 1 hour, 4 minutes and 25 seconds. During the import there were two distinct phases that manifested themselves in the CloudWatch screens.

During the first phase the disk was just being written to for the most part. I saw 343 Write IOPS being consumed with 43 MB/s being written to disk.

In the second phase the write IOPS shot up to 1,275 and write throughput hit 167 MB/s. Read IOPS, which had been non-existent in the first phase, hit 1,461 with a read throughput of 191 MB/s.

The high water mark of disk usage during the load was 95% but this dropped down to 22% when the load was completed.

Examining the Trips

The first thing I want to do when the import is complete is see if the record count matches what I expected.

SELECT COUNT(*) FROM trips;
   count
-----------
 200000000

Though I haven't yet run a vacuum operation I'll run a few quick queries to see what the data looks like. Here I'm asking what the average cost per mile was broken down by year and cab type.

SELECT EXTRACT(year from pickup_datetime),
       cab_type,
       AVG(fare_amount / trip_distance)
FROM trips
WHERE trip_distance > 0
GROUP BY 1, 2
ORDER BY 1, 2;
 date_part | cab_type |   avg
-----------+----------+----------
      2011 | yellow   | 5.871679
      2012 | yellow   | 5.636755

The average cost of a Yellow Cab journey in 2011 worked out to US$5.87 / mile (€4.19 / KM) which is ~9 times higher than what I usually paid for taxi journeys here in Estonia in 2015.

The sad thing with the 10 gzip files that were loaded in is that they didn't have any Green Cab or Uber data within them. It would have been nice to have randomised the records when they were dumped out from PostgreSQL so we could have had a sampling from across the dataset.

The following looks at the average journey length in miles broken down by year.

SELECT EXTRACT(year from pickup_datetime),
       AVG(trip_distance)
FROM trips
WHERE trip_distance > 0
GROUP BY 1
ORDER BY 1;
 date_part |  avg
-----------+-------
      2011 | 2.874
      2012 | 2.869

House Keeping

To make sure the data is sorted properly in your trips table run the vacuum command.

VACUUM trips;

The Reshift schema I put together was an educated guess as to the best encoding (read: compression) schemes to use on each column. Redshift has an ANALYZE COMPRESSION command where it will take 100,000 records per slice and compress each column in your table individually using all supported encoding schemes. It'll then report which encoding scheme used the least amount of drive space for each column. To run the command execute the following:

ANALYZE COMPRESSION trips;

Dodgy Data

Surprise, a billion records from a government institution might have a few errors here and there. To be honest, I was surprised I didn't see more issues than I did with this dataset.

When I first began loading the data into Redshift I saw the following a few times.

ERROR:  Load into table 'trips' failed.  Check 'stl_load_errors' system table for details.

I made the schema field types more accommodating to accept records that had values outside of their norm (like a pickup longitude of -2618.54486).

The copy command I used will allow 10 errors to occur during the import before aborting and it's likely that there could be more than 10 errors if I finished importing the remaining ~900M records.

If you do decide to import all the data the following will help you read the latest error messages a little easier.

\x on

SELECT *
FROM stl_load_errors
ORDER BY starttime DESC
LIMIT 1;
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

© Copyright 2014 - 2016 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.