Home | Benchmarks | Archives | Atom Feed

Posted on Thu 28 June 2018

1.1 Billion Taxi Rides with SQLite, Parquet & HDFS

Apache Parquet is a column-oriented file format that originated in the Hadoop community. Its architecture was inspired by Google's Dremel paper and originally went by the anagram "Red Elm". Work began on the format in late 2012 and had significant contributions from Julien Le Dem and Tianshuo Deng, both of whom worked at Twitter at the time as well as Ryan Blue, whom was working for Cloudera.

A tabular dataset can be broken up into one or more Parquet files. Each file stores data by columns rather than by rows. Each column can have its own compression scheme allowing for the most efficient and/or suitable compression scheme based on the data in that specific column. Other optimisations include dictionary encoding for columns where the number of unique values is in five-figure or fewer range, bit packing where small integers are stored together as a single, larger integer and run-length encoding where sequentially repeating values are stored with the value followed by the number of occurrences. Aggregation queries can easily run two orders of magnitude faster on column-oriented data compared to row-oriented data.

SQLite, the second-most deployed piece of software in the world, is a database that uses an internal file format that is row-oriented. Out of the box, neither Parquet files nor HDFS, a redundant, distributed file storage system popular in the Hadoop community, are supported.

A few weeks ago, I came across sqlite-parquet-vtable, an add-on library for SQLite written by Colin Dellow. This software allows for SQLite to interact with Parquet files.

In this benchmark I'll see how well SQLite, Parquet and HDFS perform when querying 1.1 billion taxi trips. This dataset is made up of 1.1 billion taxi trips conducted in New York City between 2009 and 2015. This is the same dataset I've used to benchmark Amazon Athena, BigQuery, BrytlytDB, ClickHouse, Elasticsearch, EMR, kdb+/q, MapD, PostgreSQL, Redshift and Vertica. I have a single-page summary of all these benchmarks for comparison.

HDFS Up and Running

It is possible to read Parquet files without a Hadoop cluster but most cases when I see engineers and analysts working with Parquet files they're often stored on a distributed file system, such as HDFS. Duplicating datasets onto local machines in order to interact with them can create confusion as to which files are most up to date as well as cognitive overheads of managing synchronisation processes. For these reasons I've decided to mount an HDFS file system locally so that SQLite can interact with Parquet stored on HDFS using local file system API calls.

For this benchmark I'll be running a fresh installation of Ubuntu 16.04.2 LTS on an Intel Core i5 4670K clocked at 3.4 GHz, 8 GB of DDR3 RAM and a SanDisk SDSSDHII960G 960 GB SSD drive. I've installed Hadoop using my Hadoop 3 Single-Node Install Guide skipping the steps for Presto and Spark.

The dataset used in this benchmark has 1.1 billion records, 51 columns and is 500 GB in size when in uncompressed CSV format. Instructions on producing the dataset can be found in my Billion Taxi Rides in Redshift blog post. The CSV files were converted into Parquet format using Hive and Snappy compression on an AWS EMR cluster. The conversion resulted in 56 Parquet files which take up 105 GB of space.

Where decompression is I/O or network bound it makes sense to keep the compressed data as compact as possible. That being said, there are cases where decompression is compute bound and compression schemes like Snappy play a useful role in lowering the overhead.

I've downloaded the Parquet files to my local file system and imported them onto HDFS. Since this is all running on a single SSD drive I've set the HDFS replication factor to 1.

$ ls -lh parquet
total 105G
... 2.0G ... 000000_0
... 2.0G ... 000001_0
... 2.0G ... 000002_0
...
... 1.5G ... 000053_0
... 1.5G ... 000054_0
... 1.4G ... 000055_0
$ hdfs dfs -mkdir /trips_parquet
$ hdfs dfs -copyFromLocal \
    parquet/0000* \
    /trips_parquet/

The column names can be found within the Parquet files so, for this exercise, there is no need to create a metadata table in Hive.

HDFS Mount

I'll mount an HDFS connection to the local file system. This way SQLite can interact with HDFS using local file system API calls. I'll be using the hadoop-hdfs-fuse utility from Cloudera's Hadoop distribution.

$ wget https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/archive.key -O - \
    | sudo apt-key add -
$ wget https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/cloudera.list -O - \
    | sudo tee /etc/apt/sources.list.d/cloudera.list
$ sudo apt update
$ sudo apt install hadoop-hdfs-fuse
$ cd ~
$ sudo mkdir -p hdfs_mount
$ sudo hadoop-fuse-dfs \
    dfs://127.0.0.1:9000 \
    hdfs_mount

Now all the data that is available via HDFS is also available via the local file system interfaces.

$ hdfs dfs -ls /trips_parquet/
Found 56 items
... 2079571240 ... /trips_parquet/000000_0
... 2053401839 ... /trips_parquet/000001_0
... 2048854327 ... /trips_parquet/000002_0
...
... 1533768646 ... /trips_parquet/000053_0
... 1548224240 ... /trips_parquet/000054_0
... 1397076662 ... /trips_parquet/000055_0
$ ls -lh hdfs_mount/trips_parquet/
total 105G
... 2.0G ... 000000_0
... 2.0G ... 000001_0
... 2.0G ... 000002_0
...
... 1.5G ... 000053_0
... 1.5G ... 000054_0
... 1.4G ... 000055_0

Parquet for SQLite

The following will install SQLite, along with a few prerequisites.

$ sudo apt-get install \
    libboost-dev \
    libboost-filesystem-dev \
    libboost-system-dev \
    libboost-regex-dev \
    sqlite3

The sqlite-parquet-vtable project takes the form of a 14 MB libparquet.so file. As of this writing the library can be installed simply by decompressing a download.

$ cd ~
$ wget https://s3.amazonaws.com/cldellow/public/libparquet/libparquet.so.xz
$ xz -d libparquet.so.xz

Once the library is loaded into SQLite, virtual tables pointing to individual Parquet files can be created.

$ sqlite3
.timer on
.load ./libparquet

I've raised a ticket asking for globbing support so that a single virtual table can point to multiple files. For this benchmark I used Python to generate the SQL commands to create 56 virtual tables pointing to each of the Parquet files on the HDFS mount. The SQL commands below have been truncated for readability purposes.

CREATE VIRTUAL TABLE trips_0 USING parquet('hdfs_mount/trips_parquet/000000_0');
CREATE VIRTUAL TABLE trips_1 USING parquet('hdfs_mount/trips_parquet/000001_0');
...
CREATE VIRTUAL TABLE trips_54 USING parquet('hdfs_mount/trips_parquet/000054_0');
CREATE VIRTUAL TABLE trips_55 USING parquet('hdfs_mount/trips_parquet/000055_0');

Benchmarking SQLite

The times quoted below are the lowest query times seen during a series of runs. As with all my benchmarks, I use the lowest query time as a way of indicating "top speed".

Because the 1.1 billion records sits across 56 virtual tables I've had to create views in SQLite federating the data I'm querying. The SELECT queries themselves have been modified to work with the semi-aggregated data.

This is the view I setup for query #1. The command executes right away and won't cause any load on the system until I query the view. I've truncated the view for readability purposes.

CREATE VIEW query_1_view AS
    SELECT cab_type,
           COUNT(*) as cnt
    FROM trips_0
    GROUP BY cab_type
    UNION ALL
    SELECT cab_type,
           COUNT(*) as cnt
    FROM trips_1
    GROUP BY cab_type
    UNION ALL
    ...
    SELECT cab_type,
           COUNT(*) as cnt
    FROM trips_54
    GROUP BY cab_type
    UNION ALL
    SELECT cab_type,
           COUNT(*) as cnt
    FROM trips_55
    GROUP BY cab_type;

The following completed in 7 minutes and 28 seconds.

SELECT cab_type,
       SUM(cnt)
FROM query_1_view
GROUP BY cab_type;

Watching top and iotop I noted only a single core was being maxed out by SQLite on my 4-core system. HDFS and the fuse utility took up 31% of another core. The SSD was being read off of at 120 MB/s.

The following is the view for query #2. I've truncated the view for readability purposes.

CREATE VIEW query_2_view AS
    SELECT passenger_count,
           total_amount
    FROM trips_0
    UNION ALL
    SELECT passenger_count,
           total_amount
    FROM trips_1
    UNION ALL
    ...
    SELECT passenger_count,
           total_amount
    FROM trips_54
    UNION ALL
    SELECT passenger_count,
           total_amount
    FROM trips_55;

The following completed in 13 minutes and 17 seconds.

SELECT passenger_count,
       AVG(total_amount)
FROM query_2_view
GROUP BY passenger_count;

The following is the view for query #3. I've truncated the view for readability purposes.

CREATE VIEW query_3_view AS
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           COUNT(*) AS num_records
    FROM trips_0
    GROUP BY passenger_count,
             pickup_year
    UNION ALL
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           COUNT(*) AS num_records
    FROM trips_1
    GROUP BY passenger_count,
             pickup_year
    UNION ALL
    ...
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           COUNT(*) AS num_records
    FROM trips_54
    GROUP BY passenger_count,
             pickup_year
    UNION ALL
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           COUNT(*) AS num_records
    FROM trips_55
    GROUP BY passenger_count,
             pickup_year;

The following completed in 30 minutes and 11 seconds.

SELECT passenger_count,
       pickup_year,
       SUM(num_records)
FROM query_3_view
GROUP BY passenger_count,
         pickup_year;

The following is the view for query #4. I've truncated the view for readability purposes.

CREATE VIEW query_4_view AS
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           ROUND(trip_distance) AS distance,
           COUNT(*) AS num_records
    FROM trips_0
    GROUP BY passenger_count,
             pickup_year,
             distance
    UNION ALL
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           ROUND(trip_distance) AS distance,
           COUNT(*) AS num_records
    FROM trips_1
    GROUP BY passenger_count,
             pickup_year,
             distance
    UNION ALL
    ...
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           ROUND(trip_distance) AS distance,
           COUNT(*) AS num_records
    FROM trips_54
    GROUP BY passenger_count,
             pickup_year,
             distance
    UNION ALL
    SELECT passenger_count,
           STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
           ROUND(trip_distance) AS distance,
           COUNT(*) AS num_records
    FROM trips_55
    GROUP BY passenger_count,
             pickup_year,
             distance;

The following completed in 54 minutes and 46 seconds.

SELECT passenger_count,
       pickup_year,
       distance,
       SUM(num_records) AS the_count
FROM query_4_view
GROUP BY passenger_count,
         pickup_year,
         distance
ORDER BY pickup_year,
         the_count DESC;

Comparing to SQLite's Internal Format

I'll install a multi-core gzip drop-in replacement and create a regular table in SQLite.

$ sudo apt install pigz
$ sqlite3 taxi.db
CREATE TABLE trips (
    trip_id                 INTEGER,
    vendor_id               TEXT,

    pickup_datetime         TEXT,

    dropoff_datetime        TEXT,
    store_and_fwd_flag      TEXT,
    rate_code_id            INTEGER,
    pickup_longitude        REAL,
    pickup_latitude         REAL,
    dropoff_longitude       REAL,
    dropoff_latitude        REAL,
    passenger_count         INTEGER,
    trip_distance           REAL,
    fare_amount             REAL,
    extra                   REAL,
    mta_tax                 REAL,
    tip_amount              REAL,
    tolls_amount            REAL,
    ehail_fee               REAL,
    improvement_surcharge   REAL,
    total_amount            REAL,
    payment_type            TEXT,
    trip_type               INTEGER,
    pickup                  TEXT,
    dropoff                 TEXT,

    cab_type                TEXT,

    precipitation           INTEGER,
    snow_depth              INTEGER,
    snowfall                INTEGER,
    max_temperature         INTEGER,
    min_temperature         INTEGER,
    average_wind_speed      INTEGER,

    pickup_nyct2010_gid     INTEGER,
    pickup_ctlabel          TEXT,
    pickup_borocode         INTEGER,
    pickup_boroname         TEXT,
    pickup_ct2010           TEXT,
    pickup_boroct2010       TEXT,
    pickup_cdeligibil       TEXT,
    pickup_ntacode          TEXT,
    pickup_ntaname          TEXT,
    pickup_puma             TEXT,

    dropoff_nyct2010_gid    INTEGER,
    dropoff_ctlabel         TEXT,
    dropoff_borocode        INTEGER,
    dropoff_boroname        TEXT,
    dropoff_ct2010          TEXT,
    dropoff_boroct2010      TEXT,
    dropoff_cdeligibil      TEXT,
    dropoff_ntacode         TEXT,
    dropoff_ntaname         TEXT,
    dropoff_puma            TEXT
);

I'll import the same dataset, albeit from 56 gzip-compressed CSV files. I wasn't able to find a reliable way to pipe in the decompressed data via stdin so I'm using a temporary file to read decompressed CSV data into SQLite.

$ vi import.sql
pragma journal_mode=memory;
.mode csv
.separator ','
.import taxi.temp trips
$ for FILENAME in trips_x*.csv.gz; do
      echo $FILENAME
      pigz -d -c $FILENAME > taxi.temp
      sqlite3 taxi.db < import.sql
      rm taxi.temp
  done

The above completed in 5.5 hours and produced a SQLite file that was 529 GB decompressed. If I used a compressed file system for that SQLite file it would sit at 137 GB in size.

The following completed in 8 hours, 39 minutes and 20 seconds.

SELECT cab_type,
       COUNT(*)
FROM trips
GROUP BY cab_type;

Note, the above query was executed with a SQLite file stored on a mechanical drive so the throughput will be capped at 120 MB/s. Reading the file off of an uncompressed file system, let alone processing it in any way, would take 75 minutes, assuming the drive could maintain top speed (which is unlikely). Using a compressed file system will bring this down to 19 minutes of I/O time but this is still a ways off of what Parquet files can offer.

Closing Thoughts

SQLite is an amazing piece of software. It's small, well-tested and can be installed virtually anywhere. I do use SQLite for doing analysis with Jupyter Notebooks. Pivot tables, heat maps over Google Maps, virtually any sort of visualisation I can imagine, can be powered by SQLite. When I'm working with a few hundred thousand rows in Pivot tables or millions of rows with less interactive workloads, the performance is more than acceptable.

With that being said, I'm not recommending migrating existing analytical backends to SQLite and Parquet. But, if you do find yourself in a situation where access to infrastructure and/or permissions to install software is limited, this guide could help with both speeding up your analytical workloads over using traditional OLTP engines as well as give guidance on what sorts of performance deltas to expect between OLTP and OLAP setups.

I can image the benchmark times seen with the Parquet add-on could be improved dramatically if SQLite's execution engine could be distributed across CPU cores and systems on top of something like Dask.distributed. I'd love to see that come to life.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.