Presto is a query engine that began life at Facebook five years ago. Today it's used by over 1,000 Facebook staff members to analyse 300+ petabytes of data that they keep in their data warehouse. Presto doesn't store any data itself but instead interfaces with existing databases. It's common for Presto to query ORC-formatted files on HDFS or S3 but there is also support for a wide variety of other storage systems as well.
In many organisations there is no single database that all information is stored in. ETL processes can be useful for copying data between databases but there can be cases where company policy doesn't allow for copying certain data around and/or the overhead of keeping the data fresh is too much of a burden.
Presto has the ability to query multiple databases in the same query. The data can even be stored in different pieces of database software and that software doesn't even need to keep data stored in a tabular form; document stores, key-value stores and streams work just as well. Presto can run a SQL query against a Kafka topic stream while joining dimensional data from PostgreSQL, Redis, MongoDB and ORC-formatted files on HDFS in the same query.
Presto is a very fast query engine but will ultimately be limited by the databases it's connecting to. Presto running an aggregation query on a billion records in ORC format stored on HDFS will almost always outperform running the same query against a MySQL server.
In this blog post I'll walk through the setup and data imports for five databases and then query them using Presto 0.196. The databases and versions being used are Kafka 1.1.0, MongoDB 3.2.19, MySQL 5.7.21, PostgreSQL 9.5.12 and Redis 3.0.6.
I'll be using the first 1,000 records from the dataset I use in my 1.1 Billion Taxi Rides benchmarks. This dataset has 51 fields comprised of a variety of data types. To save time I've cast all of the fields as VARCHARs when mapping them in the non-tabular storage engines.
The machine I'm using has an Intel Core i5-4670K CPU clocked at 3.40 GHz, 12 GB of RAM and 200 GB of NVMe, SSD-based storage capacity. I'll be using a fresh installation of Ubuntu 16.04.2 LTS with a single-node Hadoop installation built off the instructions in my Hadoop 3 Single-Node Install Guide blog post. The instructions for installing Presto can be found in that guide.
Installing Five Different Databases
I'll first add MongoDB's repository details to my Ubuntu installation. There will be a complaint that "The following signatures couldn't be verified because the public key is not available". Despite this MongoDB will still install. The problem is described in detail here.
$ sudo apt-key adv \
--keyserver hkp://keyserver.ubuntu.com:80 \
--recv EA312927
$ echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu xenial/mongodb-org/3.6 multiverse" | \
sudo tee /etc/apt/sources.list.d/mongodb-org.list
$ sudo apt update
I'll then use Ubuntu's package management to install everything except Kafka.
$ sudo apt install \
mongodb-org \
mysql-server \
postgresql \
redis-server \
zookeeperd
By default MySQL & PostgreSQL are both setup to start after installation and start after each time the system boots. The following will do the same for MongoDB and Redis.
$ for SERVICE in mongod redis-server; do
sudo systemctl start $SERVICE
sudo systemctl enable $SERVICE
done
I'll install Kafka manually using the binary package distributed by one of Apache's mirrors.
$ sudo mkdir /opt/kafka
$ wget -c -O kafka.tgz \
http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
$ sudo tar xzvf kafka.tgz \
--directory=/opt/kafka \
--strip 1
I'll then create a log file for Kafka which will be owned by my UNIX account.
$ sudo touch /var/log/kafka.log
$ sudo chown mark /var/log/kafka.log
I'll then launch Kafka's server process.
$ sudo nohup /opt/kafka/bin/kafka-server-start.sh \
/opt/kafka/config/server.properties \
> /var/log/kafka.log 2>&1 &
Importing Data into Kafka
I'll create a Kafka topic called "trips" with both a replication factor and partition count of 1. These aren't values you'd find in production systems and should be treated as only being used for educational purposes.
$ /opt/kafka/bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic trips
The 1.1 billion records are stored in CSV format and are split up into 56 GZIP-compressed files. I'll decompress the first 1,000 lines from the first of the 56 GZIP files and import those records into the "trips" Kafka topic.
$ gunzip -c trips_xaa.csv.gz \
| head -n1000 \
| /opt/kafka/bin/kafka-console-producer.sh \
--topic trips \
--broker-list localhost:9092
The data will be stored as raw CSV data in Kafka with one line of CSV content per event. Later on in this blog post I'll create configuration in Presto that will cast each Kafka event into a named and typed, tabular form.
Importing Data into MongoDB
When importing into MongoDB I'll use a file to name each of the fields in the CSV data.
$ vi fields.list
trip_id
vendor_id
pickup_datetime
dropoff_datetime
store_and_fwd_flag
rate_code_id
pickup_longitude
pickup_latitude
dropoff_longitude
dropoff_latitude
passenger_count
trip_distance
fare_amount
extra
mta_tax
tip_amount
tolls_amount
ehail_fee
improvement_surcharge
total_amount
payment_type
trip_type
pickup
dropoff
cab_type
precipitation
snow_depth
snowfall
max_temperature
min_temperature
average_wind_speed
pickup_nyct2010_gid
pickup_ctlabel
pickup_borocode
pickup_boroname
pickup_ct2010
pickup_boroct2010
pickup_cdeligibil
pickup_ntacode
pickup_ntaname
pickup_puma
dropoff_nyct2010_gid
dropoff_ctlabel
dropoff_borocode
dropoff_boroname
dropoff_ct2010
dropoff_boroct2010
dropoff_cdeligibil
dropoff_ntacode
dropoff_ntaname
dropoff_puma
That file listing the field names is included as a parameter below. Again, I'm importing the first 1,000 records of the taxi trip dataset.
$ zcat trips_xaa.csv.gz \
| head -n1000 \
| mongoimport \
--db taxi \
--collection trips \
--type csv \
--fieldFile fields.list \
--numInsertionWorkers 4
Importing Data into MySQL
Before importing any data into MySQL I'll create access credentials. The following will create an account with mark as the username, test as the password and grant all privileges on the taxi database.
$ sudo bash -c "mysql -uroot -p \
-e\"CREATE USER 'mark'@'localhost' IDENTIFIED BY 'test';
GRANT ALL PRIVILEGES ON taxi.* TO 'mark'@'localhost';
FLUSH PRIVILEGES;\""
MySQL doesn't support importing GZIP data directly via STDIN so I'll decompress 1,000 records into a CSV file before connecting to MySQL.
$ gunzip -c trips_xaa.csv.gz \
| head -n1000 \
> trips.csv
$ mysql -umark -p
Below I'll create a new "taxi" database and a "trips" table within it.
CREATE DATABASE taxi;
USE taxi
CREATE TABLE trips (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime DATETIME,
dropoff_datetime DATETIME,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(6,3),
fare_amount DECIMAL(6,2),
extra DECIMAL(6,2),
mta_tax DECIMAL(6,2),
tip_amount DECIMAL(6,2),
tolls_amount DECIMAL(6,2),
ehail_fee DECIMAL(6,2),
improvement_surcharge DECIMAL(6,2),
total_amount DECIMAL(6,2),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
);
The following will load the uncompressed CSV file into the trips table.
LOAD DATA
LOCAL INFILE 'trips.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
Importing Data into PostgreSQL
As I did with MySQL, I'll create an account, database and trips table in PostgreSQL.
$ sudo -u postgres \
bash -c "psql -c \"CREATE USER mark
WITH PASSWORD 'test'
SUPERUSER;\""
$ createdb taxi
$ psql taxi
CREATE TABLE trips (
trip_id INT,
vendor_id VARCHAR(3),
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
store_and_fwd_flag VARCHAR(1),
rate_code_id SMALLINT,
pickup_longitude DECIMAL(18,14),
pickup_latitude DECIMAL(18,14),
dropoff_longitude DECIMAL(18,14),
dropoff_latitude DECIMAL(18,14),
passenger_count SMALLINT,
trip_distance DECIMAL(6,3),
fare_amount DECIMAL(6,2),
extra DECIMAL(6,2),
mta_tax DECIMAL(6,2),
tip_amount DECIMAL(6,2),
tolls_amount DECIMAL(6,2),
ehail_fee DECIMAL(6,2),
improvement_surcharge DECIMAL(6,2),
total_amount DECIMAL(6,2),
payment_type VARCHAR(3),
trip_type SMALLINT,
pickup VARCHAR(50),
dropoff VARCHAR(50),
cab_type VARCHAR(6),
precipitation SMALLINT,
snow_depth SMALLINT,
snowfall SMALLINT,
max_temperature SMALLINT,
min_temperature SMALLINT,
average_wind_speed SMALLINT,
pickup_nyct2010_gid SMALLINT,
pickup_ctlabel VARCHAR(10),
pickup_borocode SMALLINT,
pickup_boroname VARCHAR(13),
pickup_ct2010 VARCHAR(6),
pickup_boroct2010 VARCHAR(7),
pickup_cdeligibil VARCHAR(1),
pickup_ntacode VARCHAR(4),
pickup_ntaname VARCHAR(56),
pickup_puma VARCHAR(4),
dropoff_nyct2010_gid SMALLINT,
dropoff_ctlabel VARCHAR(10),
dropoff_borocode SMALLINT,
dropoff_boroname VARCHAR(13),
dropoff_ct2010 VARCHAR(6),
dropoff_boroct2010 VARCHAR(7),
dropoff_cdeligibil VARCHAR(1),
dropoff_ntacode VARCHAR(4),
dropoff_ntaname VARCHAR(56),
dropoff_puma VARCHAR(4)
);
PostgreSQL does support importing uncompressed CSV data via STDIN. Below is the command I ran to import 1,000 taxi trip records.
$ zcat trips_xaa.csv.gz \
| head -n1000 \
| psql taxi \
-c "COPY trips
FROM stdin CSV;"
Importing Data into Redis
Redis is a key-value store. It doesn't store data in a tabular form like PostgreSQL or MySQL does. Below I'll import 1,000 taxi trip records. Each record will have a key prefixed with "trip" followed by an underscore and suffixed with the trip's numeric identifier. The value of each record will be a line of raw CSV data.
$ zcat trips_xaa.csv.gz \
| head -n1000 \
| awk -F, '{ print " SET", "\"trip_"$1"\"", "\""$0"\"" }' \
| redis-cli
Configuring Presto's Connections
In order for Presto to query Kafka a connector must be setup in Presto's catalog folder. Note the names of any tables you want to use must be specified in this file.
$ sudo vi /opt/presto/etc/catalog/kafka.properties
connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=trips
kafka.hide-internal-columns=false
Kafka stores each record as a raw CSV string. Below will cast each of the fields in these records into a named and typed form. I've used VARCHAR for each field type but a wider variety of types are supported.
$ sudo mkdir -p /opt/presto/etc/kafka
$ sudo vi /opt/presto/etc/kafka/trips.json
{
"tableName": "trips",
"schemaName": "default",
"topicName": "trips",
"message": {
"dataFormat": "csv",
"fields": [
{"type": "VARCHAR", "mapping": 0, "name": "trip_id"},
{"type": "VARCHAR", "mapping": 1, "name": "vendor_id"},
{"type": "VARCHAR", "mapping": 2, "name": "pickup_datetime"},
{"type": "VARCHAR", "mapping": 3, "name": "dropoff_datetime"},
{"type": "VARCHAR", "mapping": 4, "name": "store_and_fwd_flag"},
{"type": "VARCHAR", "mapping": 5, "name": "rate_code_id"},
{"type": "VARCHAR", "mapping": 6, "name": "pickup_longitude"},
{"type": "VARCHAR", "mapping": 7, "name": "pickup_latitude"},
{"type": "VARCHAR", "mapping": 8, "name": "dropoff_longitude"},
{"type": "VARCHAR", "mapping": 9, "name": "dropoff_latitude"},
{"type": "VARCHAR", "mapping": 10, "name": "passenger_count"},
{"type": "VARCHAR", "mapping": 11, "name": "trip_distance"},
{"type": "VARCHAR", "mapping": 12, "name": "fare_amount"},
{"type": "VARCHAR", "mapping": 13, "name": "extra"},
{"type": "VARCHAR", "mapping": 14, "name": "mta_tax"},
{"type": "VARCHAR", "mapping": 15, "name": "tip_amount"},
{"type": "VARCHAR", "mapping": 16, "name": "tolls_amount"},
{"type": "VARCHAR", "mapping": 17, "name": "ehail_fee"},
{"type": "VARCHAR", "mapping": 18, "name": "improvement_surcharge"},
{"type": "VARCHAR", "mapping": 19, "name": "total_amount"},
{"type": "VARCHAR", "mapping": 20, "name": "payment_type"},
{"type": "VARCHAR", "mapping": 21, "name": "trip_type"},
{"type": "VARCHAR", "mapping": 22, "name": "pickup"},
{"type": "VARCHAR", "mapping": 23, "name": "dropoff"},
{"type": "VARCHAR", "mapping": 24, "name": "cab_type"},
{"type": "VARCHAR", "mapping": 25, "name": "precipitation"},
{"type": "VARCHAR", "mapping": 26, "name": "snow_depth"},
{"type": "VARCHAR", "mapping": 27, "name": "snowfall"},
{"type": "VARCHAR", "mapping": 28, "name": "max_temperature"},
{"type": "VARCHAR", "mapping": 29, "name": "min_temperature"},
{"type": "VARCHAR", "mapping": 30, "name": "average_wind_speed"},
{"type": "VARCHAR", "mapping": 31, "name": "pickup_nyct2010_gid"},
{"type": "VARCHAR", "mapping": 32, "name": "pickup_ctlabel"},
{"type": "VARCHAR", "mapping": 33, "name": "pickup_borocode"},
{"type": "VARCHAR", "mapping": 34, "name": "pickup_boroname"},
{"type": "VARCHAR", "mapping": 35, "name": "pickup_ct2010"},
{"type": "VARCHAR", "mapping": 36, "name": "pickup_boroct2010"},
{"type": "VARCHAR", "mapping": 37, "name": "pickup_cdeligibil"},
{"type": "VARCHAR", "mapping": 38, "name": "pickup_ntacode"},
{"type": "VARCHAR", "mapping": 39, "name": "pickup_ntaname"},
{"type": "VARCHAR", "mapping": 40, "name": "pickup_puma"},
{"type": "VARCHAR", "mapping": 41, "name": "dropoff_nyct2010_gid"},
{"type": "VARCHAR", "mapping": 42, "name": "dropoff_ctlabel"},
{"type": "VARCHAR", "mapping": 43, "name": "dropoff_borocode"},
{"type": "VARCHAR", "mapping": 44, "name": "dropoff_boroname"},
{"type": "VARCHAR", "mapping": 45, "name": "dropoff_ct2010"},
{"type": "VARCHAR", "mapping": 46, "name": "dropoff_boroct2010"},
{"type": "VARCHAR", "mapping": 47, "name": "dropoff_cdeligibil"},
{"type": "VARCHAR", "mapping": 48, "name": "dropoff_ntacode"},
{"type": "VARCHAR", "mapping": 49, "name": "dropoff_ntaname"},
{"type": "VARCHAR", "mapping": 50, "name": "dropoff_puma"}
]
}
}
MongoDB already named each of the fields in each record so only the connector details are needed for Presto to communicate with it.
$ sudo vi /opt/presto/etc/catalog/mongodb.properties
connector.name=mongodb
mongodb.seeds=127.0.0.1
The following will give Presto the credentials and connection details for MySQL.
$ sudo vi /opt/presto/etc/catalog/mysql.properties
connector.name=mysql
connection-url=jdbc:mysql://127.0.0.1:3306
connection-user=mark
connection-password=test
The following will give Presto the credentials and connection details for PostgreSQL.
$ sudo vi /opt/presto/etc/catalog/postgresql.properties
connector.name=postgresql
connection-url=jdbc:postgresql://127.0.0.1:5432/taxi
connection-user=mark
connection-password=test
The following will give Presto the connection details for Redis. The schema and table name being used is also declared in this file.
$ sudo vi /opt/presto/etc/catalog/redis.properties
connector.name=redis
redis.table-names=schema1.trips
redis.nodes=localhost:6379
Redis stores each record in raw CSV format. The following will define a schema that will name and type cast each CSV field so they're usable in a tabular form in Presto.
$ sudo mkdir -p /opt/presto/etc/redis
$ sudo vi /opt/presto/etc/redis/trips.json
{
"tableName": "trips",
"schemaName": "schema1",
"value": {
"dataFormat": "csv",
"fields": [
{"type": "VARCHAR", "mapping": 0, "name": "trip_id"},
{"type": "VARCHAR", "mapping": 1, "name": "vendor_id"},
{"type": "VARCHAR", "mapping": 2, "name": "pickup_datetime"},
{"type": "VARCHAR", "mapping": 3, "name": "dropoff_datetime"},
{"type": "VARCHAR", "mapping": 4, "name": "store_and_fwd_flag"},
{"type": "VARCHAR", "mapping": 5, "name": "rate_code_id"},
{"type": "VARCHAR", "mapping": 6, "name": "pickup_longitude"},
{"type": "VARCHAR", "mapping": 7, "name": "pickup_latitude"},
{"type": "VARCHAR", "mapping": 8, "name": "dropoff_longitude"},
{"type": "VARCHAR", "mapping": 9, "name": "dropoff_latitude"},
{"type": "VARCHAR", "mapping": 10, "name": "passenger_count"},
{"type": "VARCHAR", "mapping": 11, "name": "trip_distance"},
{"type": "VARCHAR", "mapping": 12, "name": "fare_amount"},
{"type": "VARCHAR", "mapping": 13, "name": "extra"},
{"type": "VARCHAR", "mapping": 14, "name": "mta_tax"},
{"type": "VARCHAR", "mapping": 15, "name": "tip_amount"},
{"type": "VARCHAR", "mapping": 16, "name": "tolls_amount"},
{"type": "VARCHAR", "mapping": 17, "name": "ehail_fee"},
{"type": "VARCHAR", "mapping": 18, "name": "improvement_surcharge"},
{"type": "VARCHAR", "mapping": 19, "name": "total_amount"},
{"type": "VARCHAR", "mapping": 20, "name": "payment_type"},
{"type": "VARCHAR", "mapping": 21, "name": "trip_type"},
{"type": "VARCHAR", "mapping": 22, "name": "pickup"},
{"type": "VARCHAR", "mapping": 23, "name": "dropoff"},
{"type": "VARCHAR", "mapping": 24, "name": "cab_type"},
{"type": "VARCHAR", "mapping": 25, "name": "precipitation"},
{"type": "VARCHAR", "mapping": 26, "name": "snow_depth"},
{"type": "VARCHAR", "mapping": 27, "name": "snowfall"},
{"type": "VARCHAR", "mapping": 28, "name": "max_temperature"},
{"type": "VARCHAR", "mapping": 29, "name": "min_temperature"},
{"type": "VARCHAR", "mapping": 30, "name": "average_wind_speed"},
{"type": "VARCHAR", "mapping": 31, "name": "pickup_nyct2010_gid"},
{"type": "VARCHAR", "mapping": 32, "name": "pickup_ctlabel"},
{"type": "VARCHAR", "mapping": 33, "name": "pickup_borocode"},
{"type": "VARCHAR", "mapping": 34, "name": "pickup_boroname"},
{"type": "VARCHAR", "mapping": 35, "name": "pickup_ct2010"},
{"type": "VARCHAR", "mapping": 36, "name": "pickup_boroct2010"},
{"type": "VARCHAR", "mapping": 37, "name": "pickup_cdeligibil"},
{"type": "VARCHAR", "mapping": 38, "name": "pickup_ntacode"},
{"type": "VARCHAR", "mapping": 39, "name": "pickup_ntaname"},
{"type": "VARCHAR", "mapping": 40, "name": "pickup_puma"},
{"type": "VARCHAR", "mapping": 41, "name": "dropoff_nyct2010_gid"},
{"type": "VARCHAR", "mapping": 42, "name": "dropoff_ctlabel"},
{"type": "VARCHAR", "mapping": 43, "name": "dropoff_borocode"},
{"type": "VARCHAR", "mapping": 44, "name": "dropoff_boroname"},
{"type": "VARCHAR", "mapping": 45, "name": "dropoff_ct2010"},
{"type": "VARCHAR", "mapping": 46, "name": "dropoff_boroct2010"},
{"type": "VARCHAR", "mapping": 47, "name": "dropoff_cdeligibil"},
{"type": "VARCHAR", "mapping": 48, "name": "dropoff_ntacode"},
{"type": "VARCHAR", "mapping": 49, "name": "dropoff_ntaname"},
{"type": "VARCHAR", "mapping": 50, "name": "dropoff_puma"}
]
}
}
With all those configuration changes in place I'll restart the Presto server process.
$ sudo /opt/presto/bin/launcher restart
Querying Multiple Databases with Presto
The following with launch Presto's CLI and then query the five databases in a single query. Note the naming convention used here is catalogue.schema.table.
$ presto
SELECT (
SELECT COUNT(*) FROM kafka.default.trips
), (
SELECT COUNT(*) FROM mysql.taxi.trips
), (
SELECT COUNT(*) FROM redis.schema1.trips
), (
SELECT COUNT(*) FROM mongodb.taxi.trips
), (
SELECT COUNT(*) FROM postgresql.public.trips
);
_col0 | _col1 | _col2 | _col3 | _col4
-------+-------+-------+-------+-------
1000 | 1000 | 1000 | 1000 | 1000
The same query can also be executed via Python. The following will install Python and PyHive among a few other dependencies.
$ sudo apt install \
python \
python-pip \
virtualenv
$ virtualenv .pyhive
$ source .pyhive/bin/activate
$ pip install pyhive requests
I'll launch Python's REPL and execute the same query executed above.
$ python
from pyhive import presto
sql = """SELECT (
SELECT COUNT(*) FROM kafka.default.trips
), (
SELECT COUNT(*) FROM mysql.taxi.trips
), (
SELECT COUNT(*) FROM redis.schema1.trips
), (
SELECT COUNT(*) FROM mongodb.taxi.trips
), (
SELECT COUNT(*) FROM postgresql.public.trips
)"""
cursor = presto.connect('0.0.0.0').cursor()
cursor.execute(sql)
cursor.fetchall()
The output looks like the following.
[(1000, 1000, 1000, 1000, 1000)]
Presto can also export query results to CSV format. The following will collect all the events from Kafka's trips topic and output them to a file.
$ presto \
--execute "SELECT * FROM kafka.default.trips" \
--output-format CSV_HEADER \
> trips_from_kafka.csv