Posted on Sun 27 March 2016

A Billion Taxi Rides in PostgreSQL

Over the past six weeks I've been looking at importing the metadata of 1.1 billion taxi journeys made in New York City between 2009 and 2015 into various data stores. Each of these data stores have been aimed more towards analytical than to transactional workloads.

Though PostgreSQL does support analytical operations there isn't any automatic aggregation of field values out of the box, data cubes need to be built manually and indices are used to track down and/or exclude blocks of data rather than provide properly compact summaries that can be used to answer aggregate queries. At the end of the day aggregate query performance is proportional to the amount of data being looked at.

PostgreSQL will need to scan either the entire table or the entirety of an index which includes all rows in the table.

PostgreSQL's default storage layout stores data in an uncompressed form so reading 100 MB/s of data off a disk during a sequential scan won't yield the same amount of rows per second as it would from a gzip-compressed CSV file. The 104 GB of denormalised, compressed CSV data I've been using throughout this series of blog posts turns into 281 GB when imported into PostgreSQL.

$ sudo su - postgres -c \
    "du -hs ~/9.5/main/* | grep [0-9]G"
281G    /var/lib/postgresql/9.5/main/base

There are two tools that I'll look at to address these issues. The first is the new parallel aggregate support that should be appearing in PostgreSQL 9.6 and the second is the columnar store extension called cstore_fdw from Citus Data.

Please note, at this time AWS RDS only supports PostgreSQL 9.4 so parallel aggregate support is out of the question. The cstore_fdw extension isn't supported either so you would need to run your own PostgreSQL instances on EC2 to use either of these features if you're planning on using AWS.

Install PostgreSQL from Source

Parallel aggregation support was only added to the master branch of PostgreSQL six days ago so I'm going to compile the master branch in order to try out this feature.

The following was run on a fresh install of Ubuntu 14.04.3 LTS. The machine it was run on has the following key components:

  • An Intel Core i5-4670K Haswell Quad-Core 3.4 GHz Processor
  • 16 GB of RAM
  • An 850 GB SSD drive that can perform random I/O up to 98K IOPS

First I'll install the dependencies needed to compile PostgreSQL.

$ sudo apt-get install \
    bison \
    flex \
    git \
    libreadline-dev \
    libz-dev \
    make

Then I'll checkout the codebase with a specific commit ID so that these steps are easier to reproduce.

$ git clone git://git.postgresql.org/git/postgresql.git
$ cd postgresql
$ git checkout 676265

The following will compile and install PostgreSQL and it's contributed modules:

$ ./configure && \
    make -j4 && \
    sudo make install

$ cd contrib
$ make -j4 && \
    sudo make install

I'll then add a postgres user to the system, create a directory for it to store it's data and initialise that data directory.

$ sudo adduser postgres

$ sudo mkdir /usr/local/pgsql/data
$ sudo chown postgres -R \
    /usr/local/pgsql/data

$ sudo su - postgres -c \
    "/usr/local/pgsql/bin/initdb -D /usr/local/pgsql/data"

I'll then install the control script, make sure PostgreSQL launches when the system does and then start PostgreSQL's server.

$ sudo cp ~/postgresql/contrib/start-scripts/linux \
    /etc/init.d/postgresql

$ sudo chmod +x /etc/init.d/postgresql
$ sudo update-rc.d postgresql defaults

$ sudo /etc/init.d/postgresql start

I'll set the path environment variable for my account to check the binaries directory of PostgreSQL before looking in any others when calling pathless commands.

$ PATH="/usr/local/pgsql/bin/:$PATH"
$ export PATH

I'll then add permissions for my user account to create databases:

$ sudo su - postgres -c \
    'echo "CREATE USER mark;
           ALTER USER mark WITH SUPERUSER;" | /usr/local/pgsql/bin/psql'

Importing 1.1 Billion Records into PostgreSQL

The following will create a trips database that will contain the taxi trip metadata.

$ createdb trips

I'll then create a table that will store all the taxi trip metadata in a denormalised form.

$ psql trips
CREATE TABLE trips (
    trip_id                 SERIAL,
    vendor_id               VARCHAR(3),

    pickup_datetime         TIMESTAMP NOT NULL,

    dropoff_datetime        TIMESTAMP NOT NULL,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT NOT NULL,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT NOT NULL DEFAULT '0',
    trip_distance           DECIMAL(6,3) DEFAULT '0.0',
    fare_amount             DECIMAL(6,2) DEFAULT '0.0',
    extra                   DECIMAL(6,2) DEFAULT '0.0',
    mta_tax                 DECIMAL(6,2) DEFAULT '0.0',
    tip_amount              DECIMAL(6,2) DEFAULT '0.0',
    tolls_amount            DECIMAL(6,2) DEFAULT '0.0',
    ehail_fee               DECIMAL(6,2) DEFAULT '0.0',
    improvement_surcharge   DECIMAL(6,2) DEFAULT '0.0',
    total_amount            DECIMAL(6,2) DEFAULT '0.0',
    payment_type            VARCHAR(6),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6) NOT NULL,

    precipitation           SMALLINT DEFAULT '0',
    snow_depth              SMALLINT DEFAULT '0',
    snowfall                SMALLINT DEFAULT '0',
    max_temperature         SMALLINT DEFAULT '0',
    min_temperature         SMALLINT DEFAULT '0',
    average_wind_speed      SMALLINT DEFAULT '0',

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4),

    CONSTRAINT pk_trips PRIMARY KEY (trip_id)
);

I've got a 54 x ~2 GB gzip files containing the metadata of 1.1 billion taxi journeys in a folder called ~/taxi-trips. To generate this data please see the steps in my Billion Taxi Rides in Redshift blog post.

I'll make the postgres user the owner of these files as it will use it's own account to access them.

$ cd ~/taxi-trips
$ sudo chown postgres trips_x*.csv.gz

The following will import all the data from each of the 54 gzip files into the trips table.

$ screen
$ for filename in *.csv.gz
  do
      echo $filename
      echo "COPY trips
            FROM PROGRAM
            'gunzip -c `pwd`/$filename'
            DELIMITER ',' CSV;" | psql trips
  done

The import took a little over an hour on my machine.

Benchmarking Parallel Aggregate Support

I've set PostgreSQL to use 4 parallel workers to perform the query benchmarked below.

SET max_parallel_degree = 4;
\timing on

The following shows that PostgreSQL will indeed use 4 workers when the query will run.

EXPLAIN (COSTS OFF)
SELECT cab_type,
       count(*)
FROM trips
GROUP BY cab_type;
                     QUERY PLAN
----------------------------------------------------
 Finalize GroupAggregate
   Group Key: cab_type
   ->  Sort
         Sort Key: cab_type
         ->  Gather
               Number of Workers: 4
               ->  Partial HashAggregate
                     Group Key: cab_type
                     ->  Parallel Seq Scan on trips

The following completed in 1 hour 1 minute and 7 seconds.

SELECT cab_type,
       count(*)
FROM trips
GROUP BY cab_type;

I've seen the above query take 3.5 hours before so it is an improvement but this performance won't hold a candle to any proper OLAP storage engine. Thankfully we can add an OLAP extension to PostgreSQL.

Citus Data's cstore_fdw Up and Running

The cstore_fdw extension from Citus Data adds support for PostgreSQL to store data in ORC format. This means the data is stored as columns instead of rows on the disk. This extension also allows those columns to be compressed.

I've found in the past that data stored as columns tends to be 50% of the size it would be if it were in CSV format and gzip compressed. The organisation of the data and the lower amounts of I/O that will be required to read the data should give aggregate query performance a huge boost.

The cstore_fdw extension currently only supports PostgreSQL 9.3, 9.4 and 9.5 so for this exercise I'll install PostgreSQL 9.5 on a fresh installation of Ubuntu 14.04.3 LTS.

I'll install PostgreSQL from the official Debian package but I'll also add tools to compile the cstore_fdw extension from source.

$ echo "deb http://apt.postgresql.org/pub/repos/apt/ trusty-pgdg main 9.5" | \
    sudo tee /etc/apt/sources.list.d/postgresql.list

$ gpg --keyserver pgp.mit.edu --recv-keys 7FCC7D46ACCC4CF8
$ gpg --armor --export 7FCC7D46ACCC4CF8 | sudo apt-key add -
$ sudo apt-get update

$ sudo apt-get install \
    git \
    libpq-dev \
    libprotobuf-c0-dev \
    make \
    postgresql-9.5 \
    postgresql-server-dev-9.5 \
    protobuf-c-compiler

The following will check out the latest version of the cstore_fdw extension, compile and install it.

$ git clone https://github.com/citusdata/cstore_fdw.git

$ cd cstore_fdw
$ make -j8 && \
    sudo make install

I'll then change PostgreSQL's configuration so that it knows the extension is available to use.

$ sudo su - -c \
    "sed \"s/#shared_preload_libraries = ''/shared_preload_libraries = 'cstore_fdw'/g\" /etc/postgresql/9.5/main/postgresql.conf -i"
$ sudo /etc/init.d/postgresql restart

Since this is a fresh install of Ubuntu, I'll have to again allow my user account to create databases in PostgreSQL.

$ sudo su - postgres -c \
    "psql -c 'CREATE USER mark;
              ALTER USER mark WITH SUPERUSER;'"
$ createdb trips

I'll then add the cstore_fdw extension and foreign data wrapper to PostgreSQL.

$ sudo su - postgres -c \
    "echo 'CREATE EXTENSION cstore_fdw;
           CREATE SERVER cstore_server
                FOREIGN DATA WRAPPER cstore_fdw;' | psql trips"

I need to create a table for the trips data but this table will have a few key differences from the other versions of this table I've created in the past. The first is that primary key constraints aren't supported on foreign data wrappers so the trip_id field's data type will need to be BIGINT instead of SERIAL. The next change is that the table will be declared as a FOREIGN TABLE and contain parameters at the end of the declaration to tell the cstore_fdw extension how we want to compress and 'key frame' the data.

$ psql trips
CREATE FOREIGN TABLE trips
(
    trip_id                 BIGINT,
    vendor_id               VARCHAR(3),

    pickup_datetime         TIMESTAMP NOT NULL,

    dropoff_datetime        TIMESTAMP NOT NULL,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT NOT NULL,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT NOT NULL DEFAULT '0',
    trip_distance           DECIMAL(6,3) DEFAULT '0.0',
    fare_amount             DECIMAL(6,2) DEFAULT '0.0',
    extra                   DECIMAL(6,2) DEFAULT '0.0',
    mta_tax                 DECIMAL(6,2) DEFAULT '0.0',
    tip_amount              DECIMAL(6,2) DEFAULT '0.0',
    tolls_amount            DECIMAL(6,2) DEFAULT '0.0',
    ehail_fee               DECIMAL(6,2) DEFAULT '0.0',
    improvement_surcharge   DECIMAL(6,2) DEFAULT '0.0',
    total_amount            DECIMAL(6,2) DEFAULT '0.0',
    payment_type            VARCHAR(6),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6) NOT NULL,

    precipitation           SMALLINT DEFAULT '0',
    snow_depth              SMALLINT DEFAULT '0',
    snowfall                SMALLINT DEFAULT '0',
    max_temperature         SMALLINT DEFAULT '0',
    min_temperature         SMALLINT DEFAULT '0',
    average_wind_speed      SMALLINT DEFAULT '0',

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
)
SERVER cstore_server
OPTIONS(compression 'pglz',
        block_row_count '40000',
        stripe_row_count '600000');

Importing 1.1 Billion Records into ORC Format

I've created a ~/taxi-trips folder with the 54 gzip files of taxi trip data and set their ownership to the postgres user.

$ cd ~/taxi-trips
$ sudo chown postgres trips_x*.csv.gz

I then imported the data as usual into PostgreSQL.

$ screen
$ for filename in *.csv.gz
  do
      echo $filename
      echo "COPY trips
            FROM PROGRAM
            'gunzip -c `pwd`/$filename'
            DELIMITER ',' CSV;" | psql trips
  done

When the import is finished I can see the data is around 3.5x smaller than it was when stored using PostgreSQL's standard storage format.

$ sudo su - postgres -c "du -hs ~/9.5/main/* | grep [0-9]G"
81G     /var/lib/postgresql/9.5/main/cstore_fdw

Benchmarking cstore_fdw

The following shows PostgreSQL will run a foreign scan on the cstore_fdw data when we execute a query on the trips table.

EXPLAIN (COSTS OFF)
SELECT cab_type,
       count(*)
FROM trips
GROUP BY cab_type;
                                QUERY PLAN
--------------------------------------------------------------------------
 HashAggregate
   Group Key: cab_type
   ->  Foreign Scan on trips
         CStore File: /var/lib/postgresql/9.5/main/cstore_fdw/16385/16394

The following completed in 2 minutes and 32 seconds.

SELECT cab_type,
       count(*)
FROM trips
GROUP BY cab_type;

The following completed in 2 minutes and 55 seconds.

SELECT passenger_count,
       avg(total_amount)
FROM trips
GROUP BY passenger_count;

The following completed in 3 minutes and 55 seconds.

SELECT passenger_count,
       EXTRACT(year from pickup_datetime) as year,
       count(*)
FROM trips
GROUP BY passenger_count,
         year;

The following completed in 6 minutes and 8 seconds.

SELECT passenger_count,
       EXTRACT(year from pickup_datetime) as year,
       round(trip_distance) distance,
       count(*) trips
FROM trips
GROUP BY passenger_count,
         year,
         distance
ORDER BY year,
         trips desc;
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

© Copyright 2014 - 2016 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.