Apache Parquet is a column-oriented file format that originated in the Hadoop community. Its architecture was inspired by Google's Dremel paper and originally went by the anagram "Red Elm". Work began on the format in late 2012 and had significant contributions from Julien Le Dem and Tianshuo Deng, both of whom worked at Twitter at the time as well as Ryan Blue, whom was working for Cloudera.
A tabular dataset can be broken up into one or more Parquet files. Each file stores data by columns rather than by rows. Each column can have its own compression scheme allowing for the most efficient and/or suitable compression scheme based on the data in that specific column. Other optimisations include dictionary encoding for columns where the number of unique values is in five-figure or fewer range, bit packing where small integers are stored together as a single, larger integer and run-length encoding where sequentially repeating values are stored with the value followed by the number of occurrences. Aggregation queries can easily run two orders of magnitude faster on column-oriented data compared to row-oriented data.
SQLite, the second-most deployed piece of software in the world, is a database that uses an internal file format that is row-oriented. Out of the box, neither Parquet files nor HDFS, a redundant, distributed file storage system popular in the Hadoop community, are supported.
A few weeks ago, I came across sqlite-parquet-vtable, an add-on library for SQLite written by Colin Dellow. This software allows for SQLite to interact with Parquet files.
In this benchmark I'll see how well SQLite, Parquet and HDFS perform when querying 1.1 billion taxi trips. This dataset is made up of 1.1 billion taxi trips conducted in New York City between 2009 and 2015. This is the same dataset I've used to benchmark Amazon Athena, BigQuery, BrytlytDB, ClickHouse, Elasticsearch, EMR, kdb+/q, MapD, PostgreSQL, Redshift and Vertica. I have a single-page summary of all these benchmarks for comparison.
HDFS Up and Running
It is possible to read Parquet files without a Hadoop cluster but most cases when I see engineers and analysts working with Parquet files they're often stored on a distributed file system, such as HDFS. Duplicating datasets onto local machines in order to interact with them can create confusion as to which files are most up to date as well as cognitive overheads of managing synchronisation processes. For these reasons I've decided to mount an HDFS file system locally so that SQLite can interact with Parquet stored on HDFS using local file system API calls.
For this benchmark I'll be running a fresh installation of Ubuntu 16.04.2 LTS on an Intel Core i5 4670K clocked at 3.4 GHz, 8 GB of DDR3 RAM and a SanDisk SDSSDHII960G 960 GB SSD drive. I've installed Hadoop using my Hadoop 3 Single-Node Install Guide skipping the steps for Presto and Spark.
The dataset used in this benchmark has 1.1 billion records, 51 columns and is 500 GB in size when in uncompressed CSV format. Instructions on producing the dataset can be found in my Billion Taxi Rides in Redshift blog post. The CSV files were converted into Parquet format using Hive and Snappy compression on an AWS EMR cluster. The conversion resulted in 56 Parquet files which take up 105 GB of space.
Where decompression is I/O or network bound it makes sense to keep the compressed data as compact as possible. That being said, there are cases where decompression is compute bound and compression schemes like Snappy play a useful role in lowering the overhead.
I've downloaded the Parquet files to my local file system and imported them onto HDFS. Since this is all running on a single SSD drive I've set the HDFS replication factor to 1.
$ ls -lh parquet
total 105G
... 2.0G ... 000000_0
... 2.0G ... 000001_0
... 2.0G ... 000002_0
...
... 1.5G ... 000053_0
... 1.5G ... 000054_0
... 1.4G ... 000055_0
$ hdfs dfs -mkdir /trips_parquet
$ hdfs dfs -copyFromLocal \
parquet/0000* \
/trips_parquet/
The column names can be found within the Parquet files so, for this exercise, there is no need to create a metadata table in Hive.
HDFS Mount
I'll mount an HDFS connection to the local file system. This way SQLite can interact with HDFS using local file system API calls. I'll be using the hadoop-hdfs-fuse utility from Cloudera's Hadoop distribution.
$ wget https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/archive.key -O - \
| sudo apt-key add -
$ wget https://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh/cloudera.list -O - \
| sudo tee /etc/apt/sources.list.d/cloudera.list
$ sudo apt update
$ sudo apt install hadoop-hdfs-fuse
$ cd ~
$ sudo mkdir -p hdfs_mount
$ sudo hadoop-fuse-dfs \
dfs://127.0.0.1:9000 \
hdfs_mount
Now all the data that is available via HDFS is also available via the local file system interfaces.
$ hdfs dfs -ls /trips_parquet/
Found 56 items
... 2079571240 ... /trips_parquet/000000_0
... 2053401839 ... /trips_parquet/000001_0
... 2048854327 ... /trips_parquet/000002_0
...
... 1533768646 ... /trips_parquet/000053_0
... 1548224240 ... /trips_parquet/000054_0
... 1397076662 ... /trips_parquet/000055_0
$ ls -lh hdfs_mount/trips_parquet/
total 105G
... 2.0G ... 000000_0
... 2.0G ... 000001_0
... 2.0G ... 000002_0
...
... 1.5G ... 000053_0
... 1.5G ... 000054_0
... 1.4G ... 000055_0
Parquet for SQLite
The following will install SQLite, along with a few prerequisites.
$ sudo apt install \
libboost-dev \
libboost-filesystem-dev \
libboost-system-dev \
libboost-regex-dev \
sqlite3
The sqlite-parquet-vtable project takes the form of a 14 MB libparquet.so file. As of this writing the library can be installed simply by decompressing a download.
$ cd ~
$ wget https://s3.amazonaws.com/cldellow/public/libparquet/libparquet.so.xz
$ xz -d libparquet.so.xz
Once the library is loaded into SQLite, virtual tables pointing to individual Parquet files can be created.
$ sqlite3
.timer on
.load ./libparquet
I've raised a ticket asking for globbing support so that a single virtual table can point to multiple files. For this benchmark I used Python to generate the SQL commands to create 56 virtual tables pointing to each of the Parquet files on the HDFS mount. The SQL commands below have been truncated for readability purposes.
CREATE VIRTUAL TABLE trips_0 USING parquet('hdfs_mount/trips_parquet/000000_0');
CREATE VIRTUAL TABLE trips_1 USING parquet('hdfs_mount/trips_parquet/000001_0');
...
CREATE VIRTUAL TABLE trips_54 USING parquet('hdfs_mount/trips_parquet/000054_0');
CREATE VIRTUAL TABLE trips_55 USING parquet('hdfs_mount/trips_parquet/000055_0');
Benchmarking SQLite
The times quoted below are the lowest query times seen during a series of runs. As with all my benchmarks, I use the lowest query time as a way of indicating "top speed".
Because the 1.1 billion records sits across 56 virtual tables I've had to create views in SQLite federating the data I'm querying. The SELECT queries themselves have been modified to work with the semi-aggregated data.
This is the view I setup for query #1. The command executes right away and won't cause any load on the system until I query the view. I've truncated the view for readability purposes.
CREATE VIEW query_1_view AS
SELECT cab_type,
COUNT(*) as cnt
FROM trips_0
GROUP BY cab_type
UNION ALL
SELECT cab_type,
COUNT(*) as cnt
FROM trips_1
GROUP BY cab_type
UNION ALL
...
SELECT cab_type,
COUNT(*) as cnt
FROM trips_54
GROUP BY cab_type
UNION ALL
SELECT cab_type,
COUNT(*) as cnt
FROM trips_55
GROUP BY cab_type;
The following completed in 7 minutes and 28 seconds.
SELECT cab_type,
SUM(cnt)
FROM query_1_view
GROUP BY cab_type;
Watching top and iotop I noted only a single core was being maxed out by SQLite on my 4-core system. HDFS and the fuse utility took up 31% of another core. The SSD was being read off of at 120 MB/s.
The following is the view for query #2. I've truncated the view for readability purposes.
CREATE VIEW query_2_view AS
SELECT passenger_count,
total_amount
FROM trips_0
UNION ALL
SELECT passenger_count,
total_amount
FROM trips_1
UNION ALL
...
SELECT passenger_count,
total_amount
FROM trips_54
UNION ALL
SELECT passenger_count,
total_amount
FROM trips_55;
The following completed in 13 minutes and 17 seconds.
SELECT passenger_count,
AVG(total_amount)
FROM query_2_view
GROUP BY passenger_count;
The following is the view for query #3. I've truncated the view for readability purposes.
CREATE VIEW query_3_view AS
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
COUNT(*) AS num_records
FROM trips_0
GROUP BY passenger_count,
pickup_year
UNION ALL
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
COUNT(*) AS num_records
FROM trips_1
GROUP BY passenger_count,
pickup_year
UNION ALL
...
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
COUNT(*) AS num_records
FROM trips_54
GROUP BY passenger_count,
pickup_year
UNION ALL
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
COUNT(*) AS num_records
FROM trips_55
GROUP BY passenger_count,
pickup_year;
The following completed in 30 minutes and 11 seconds.
SELECT passenger_count,
pickup_year,
SUM(num_records)
FROM query_3_view
GROUP BY passenger_count,
pickup_year;
The following is the view for query #4. I've truncated the view for readability purposes.
CREATE VIEW query_4_view AS
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
ROUND(trip_distance) AS distance,
COUNT(*) AS num_records
FROM trips_0
GROUP BY passenger_count,
pickup_year,
distance
UNION ALL
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
ROUND(trip_distance) AS distance,
COUNT(*) AS num_records
FROM trips_1
GROUP BY passenger_count,
pickup_year,
distance
UNION ALL
...
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
ROUND(trip_distance) AS distance,
COUNT(*) AS num_records
FROM trips_54
GROUP BY passenger_count,
pickup_year,
distance
UNION ALL
SELECT passenger_count,
STRFTIME('%Y', DATETIME(pickup_datetime / 1000, 'unixepoch')) AS pickup_year,
ROUND(trip_distance) AS distance,
COUNT(*) AS num_records
FROM trips_55
GROUP BY passenger_count,
pickup_year,
distance;
The following completed in 54 minutes and 46 seconds.
SELECT passenger_count,
pickup_year,
distance,
SUM(num_records) AS the_count
FROM query_4_view
GROUP BY passenger_count,
pickup_year,
distance
ORDER BY pickup_year,
the_count DESC;
Comparing to SQLite's Internal Format
I'll install a multi-core gzip drop-in replacement and create a regular table in SQLite.
$ sudo apt install pigz
$ sqlite3 taxi.db
CREATE TABLE trips (
trip_id INTEGER,
vendor_id TEXT,
pickup_datetime TEXT,
dropoff_datetime TEXT,
store_and_fwd_flag TEXT,
rate_code_id INTEGER,
pickup_longitude REAL,
pickup_latitude REAL,
dropoff_longitude REAL,
dropoff_latitude REAL,
passenger_count INTEGER,
trip_distance REAL,
fare_amount REAL,
extra REAL,
mta_tax REAL,
tip_amount REAL,
tolls_amount REAL,
ehail_fee REAL,
improvement_surcharge REAL,
total_amount REAL,
payment_type TEXT,
trip_type INTEGER,
pickup TEXT,
dropoff TEXT,
cab_type TEXT,
precipitation INTEGER,
snow_depth INTEGER,
snowfall INTEGER,
max_temperature INTEGER,
min_temperature INTEGER,
average_wind_speed INTEGER,
pickup_nyct2010_gid INTEGER,
pickup_ctlabel TEXT,
pickup_borocode INTEGER,
pickup_boroname TEXT,
pickup_ct2010 TEXT,
pickup_boroct2010 TEXT,
pickup_cdeligibil TEXT,
pickup_ntacode TEXT,
pickup_ntaname TEXT,
pickup_puma TEXT,
dropoff_nyct2010_gid INTEGER,
dropoff_ctlabel TEXT,
dropoff_borocode INTEGER,
dropoff_boroname TEXT,
dropoff_ct2010 TEXT,
dropoff_boroct2010 TEXT,
dropoff_cdeligibil TEXT,
dropoff_ntacode TEXT,
dropoff_ntaname TEXT,
dropoff_puma TEXT
);
I'll import the same dataset, albeit from 56 gzip-compressed CSV files. I wasn't able to find a reliable way to pipe in the decompressed data via stdin so I'm using a temporary file to read decompressed CSV data into SQLite.
$ vi import.sql
pragma journal_mode=memory;
.mode csv
.separator ','
.import taxi.temp trips
$ for FILENAME in trips_x*.csv.gz; do
echo $FILENAME
pigz -d -c $FILENAME > taxi.temp
sqlite3 taxi.db < import.sql
rm taxi.temp
done
The above completed in 5.5 hours and produced a SQLite file that was 529 GB decompressed. If I used a compressed file system for that SQLite file it would sit at 137 GB in size.
The following completed in 8 hours, 39 minutes and 20 seconds.
SELECT cab_type,
COUNT(*)
FROM trips
GROUP BY cab_type;
Note, the above query was executed with a SQLite file stored on a mechanical drive so the throughput will be capped at 120 MB/s. Reading the file off of an uncompressed file system, let alone processing it in any way, would take 75 minutes, assuming the drive could maintain top speed (which is unlikely). Using a compressed file system will bring this down to 19 minutes of I/O time but this is still a ways off of what Parquet files can offer.
Closing Thoughts
SQLite is an amazing piece of software. It's small, well-tested and can be installed virtually anywhere. I do use SQLite for doing analysis with Jupyter Notebooks. Pivot tables, heat maps over Google Maps, virtually any sort of visualisation I can imagine, can be powered by SQLite. When I'm working with a few hundred thousand rows in Pivot tables or millions of rows with less interactive workloads, the performance is more than acceptable.
With that being said, I'm not recommending migrating existing analytical backends to SQLite and Parquet. But, if you do find yourself in a situation where access to infrastructure and/or permissions to install software is limited, this guide could help with both speeding up your analytical workloads over using traditional OLTP engines as well as give guidance on what sorts of performance deltas to expect between OLTP and OLAP setups.
I can image the benchmark times seen with the Parquet add-on could be improved dramatically if SQLite's execution engine could be distributed across CPU cores and systems on top of something like Dask.distributed. I'd love to see that come to life.