This blog post will cover how I took a billion+ records containing six years of taxi ride metadata in New York City and analysed them using Elasticsearch on a single machine. This is the fifth blog post in a series where I've looked at importing the same dataset into Redshift, Spark and Hive and Presto on a local cluster and on EMR.
Apache Lucene is a indexing and search library that was developed by, among others, Doug Cutting, Erik Hatcher and Yonik Seeley in its early years. Elasticsearch, principally develop for many years by Shay Banon, sits on top of Lucene and, among many other features, allows users to communicate with Lucene via HTTP, JSON and with the help of 3rd-party plugins, SQL.
The indexing process of new data into Elasticsearch / Lucene can be more taxing than what you would find in other data stores but once the data is in place you have one of the richest selections of querying capabilities on offer. To add to that there is tremendous performance in terms of aggregations, even when using just a single machine.
UPDATE: Since writing this blog post Zachary Tong of Elastic reached out with a number of recommendations on importing this dataset. These are highlighted in a follow-up blog post "All 1.1 Billion Taxi Rides in Elasticsearch".
Hardware & Bottlenecks
The machine I used when creating this blog post has the following components.
- An Intel Core i5-4670K Haswell Quad-Core 3.4 GHz Processor
- 16 GB of RAM
- An 850 GB SSD drive that can perform random I/O up to 98K IOPS. This was used as the primary drive and as storage for Elasticsearch's data.
- A second, 1 TB mechanical drive used for storing the taxi trip data in its raw form. The taxi trip data was read from here when it was imported into Elasticsearch.
If you're using a single machine and no networking is involved when inserting documents into Elasticsearch the first bottleneck will be CPU core performance. Elasticsearch can use multiple cores so the second bottleneck will be the number of cores you have in your CPU. The third bottleneck will be the write speed of the disk containing the Lucene indices. The fourth bottleneck will be the read speed of the disk you're importing your data from.
Installing Dependencies
The following will install Java 8 (update 74 as of this writing), Elasticsearch 2.1.1 (which uses Lucene 5.3.1) and Logstash 2.2. This was run on a fresh install of Ubuntu 14.04.3 LTS.
$ echo "deb http://packages.elastic.co/elasticsearch/2.x/debian stable main" | \
sudo tee /etc/apt/sources.list.d/elasticsearch.list
$ echo "deb http://packages.elastic.co/logstash/2.2/debian stable main" | \
sudo tee /etc/apt/sources.list.d/logstash.list
$ gpg --keyserver pgp.mit.edu --recv-keys D88E42B4
$ gpg --armor --export D88E42B4 | sudo apt-key add -
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt update
$ sudo apt install \
elasticsearch=2.1.1 \
logstash \
oracle-java8-installer
Elasticsearch Up and Running
The following are the configuration options I used for Elasticsearch.
$ sudo vi /etc/elasticsearch/elasticsearch.yml
bootstrap.mlockall: true
cluster.routing.allocation.disk.threshold_enabled: true
cluster.routing.allocation.disk.watermark.low: .98
cluster.routing.allocation.disk.watermark.high: .99
index.codec: best_compression
index.number_of_replicas: 0
index.refresh_interval: -1
indices.fielddata.cache.size: 25%
I've benchmarked the memory on my system at being able to read at 18 GB / second. This is around ~36x faster than I've seen the SSD perform at so to ensure Elasticsearch doesn't resort to using a swap file on the disk at any point I've set the mlockall argument to true.
I've made sure the disk watermarks are very high. If the disk space used on the machine ever reached these percentages it would no longer allow access to all of the indices until more nodes that could be used to balance out the disk usage were added to the Elasticsearch cluster.
To minimise disk space usage I've set the indexing codec to best_compression and turned off replicas. To minimise any overhead during the ingestion process I've disabled index refreshing. I've set the field data cache to 25% of the memory available to Elasticsearch.
I also made two changes to the JVM settings for Elasticsearch.
$ sudo vi /etc/init.d/elasticsearch
Uncomment the lines declaring ES_HEAP_SIZE and ES_JAVA_OPTS and set their values to the following:
ES_HEAP_SIZE=8g
ES_JAVA_OPTS="-XX:-UseParNewGC -XX:-UseConcMarkSweepGC -XX:+UseG1GC -XX:+UseStringDeduplication"
The first line allows for 8 GB of memory to be used for Java's heap space. The second tells Java to use the G1 Garbage Collector, to use string de-duplication (available as of Java 8 update 20), to not use parallel young generation garbage collection nor to use the Concurrent Mark Sweep Garbage Collector.
I ran the following to install an SQL interface for Elasticsearch.
$ sudo /usr/share/elasticsearch/bin/plugin install \
https://github.com/NLPchina/elasticsearch-sql/releases/download/2.1.1/elasticsearch-sql-2.1.1.zip
I then restarted Elasticsearch to enable the changes above.
$ sudo /etc/init.d/elasticsearch restart
Importing a Billion Trips into Elasticsearch
The machine used in this blog post has two physical disk drives. The first is an SSD drive used for the operating system, applications and to store the data indexed by Elasticsearch. The second drive is a mechanical drive that holds the denormalised CSV data created in the Billion Taxi Rides in Redshift blog post. This second drive is mounted at /one_tb_drive on my machine.
I'll be using Logstash to import the data into Elasticsearch. The CSV data is 104 GB when gzip compressed. Unfortunitely, at the time of this writing, Logstash doesn't support reading CSV data from gzip files. To get around this I'll need to decompress all the gzip files and store them in their raw form. This will raise the disk space requirements from 104 GB for the gzip data to ~500 GB for the raw CSV data.
$ cd /one_tb_drive/taxi-trips/
$ gunzip *.gz
The following will create the configuration file for Logstash.
$ vi ~/trips.conf
input {
file {
path => "/one_tb_drive/taxi-trips/*.csv"
type => "trip"
start_position => "beginning"
}
}
filter {
csv {
columns => ["trip_id",
"vendor_id",
"pickup_datetime",
"dropoff_datetime",
"store_and_fwd_flag",
"rate_code_id",
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude",
"passenger_count",
"trip_distance",
"fare_amount",
"extra",
"mta_tax",
"tip_amount",
"tolls_amount",
"ehail_fee",
"improvement_surcharge",
"total_amount",
"payment_type",
"trip_type",
"pickup",
"dropoff",
"cab_type",
"precipitation",
"snow_depth",
"snowfall",
"max_temperature",
"min_temperature",
"average_wind_speed",
"pickup_nyct2010_gid",
"pickup_ctlabel",
"pickup_borocode",
"pickup_boroname",
"pickup_ct2010",
"pickup_boroct2010",
"pickup_cdeligibil",
"pickup_ntacode",
"pickup_ntaname",
"pickup_puma",
"dropoff_nyct2010_gid",
"dropoff_ctlabel",
"dropoff_borocode",
"dropoff_boroname",
"dropoff_ct2010",
"dropoff_boroct2010",
"dropoff_cdeligibil",
"dropoff_ntacode",
"dropoff_ntaname",
"dropoff_puma"]
separator => ","
}
mutate {
remove_field => ["average_wind_speed",
"dropoff",
"dropoff_borocode",
"dropoff_boroct2010",
"dropoff_boroname",
"dropoff_cdeligibil",
"dropoff_ct2010",
"dropoff_ctlabel",
"dropoff_datetime",
"dropoff_latitude",
"dropoff_longitude",
"dropoff_ntacode",
"dropoff_ntaname",
"dropoff_nyct2010_gid",
"dropoff_puma",
"ehail_fee",
"extra",
"fare_amount",
"host",
"improvement_surcharge",
"max_temperature",
"message",
"min_temperature",
"mta_tax",
"path",
"payment_type",
"pickup",
"pickup_borocode",
"pickup_boroct2010",
"pickup_boroname",
"pickup_cdeligibil",
"pickup_ct2010",
"pickup_ctlabel",
"pickup_latitude",
"pickup_longitude",
"pickup_ntacode",
"pickup_ntaname",
"pickup_nyct2010_gid",
"pickup_puma",
"precipitation",
"rate_code_id",
"snow_depth",
"snowfall",
"store_and_fwd_flag",
"tip_amount",
"tolls_amount",
"trip_id",
"trip_type",
"type",
"vendor_id"]
}
date {
match => ["pickup_datetime", "YYYY-MM-dd HH:mm:ss"]
timezone => "America/New_York"
target => "pickup_datetime"
}
mutate {
convert => {
"cab_type" => "string"
"passenger_count" => "integer"
"total_amount" => "float"
"trip_distance" => "float"
}
}
}
output {
elasticsearch {
action => "index"
hosts => "localhost:9200"
index => "trips"
}
}
The following will launch the Logstash process in a screen. It will read the configuration file passed to it and begin the import process. Logstash was able to read all ~500 GB of CSV data into Elasticsearch and have it indexed in just under 70 hours on my machine.
$ screen
$ /opt/logstash/bin/logstash -f ~/trips.conf
It's wise to call the optimize endpoint every few hours or so. It'll clear out a few gigabytes of space that's no longer needed by Elasticsearch.
$ curl localhost:9200/trips/_optimize
Why aren't you importing every field?
The above import process will not import every record from the CSV files into Elasticsearch. If it did import all of them it would take up around 1,711 GB of space. There currently isn't a single 2TB SSD drive big enough to store 1,711 GB of index data and have enough space to not trigger the high watermarks Elasticsearch is configured for on an ext4-formatted partition.
Even if the 2 TB SSD drives did have enough formatted space they currently sell for £500+ each. The Intel SSDPECME032T401 SSD drive has 3.2 TB of space which should be more than enough but the £6,192.18 price tag is about what I've paid in rent for my flat over the past two years.
Another option would be to buy multiple SSD drives that are the same size and have a low GB / $ ratio and run them in a RAID-0 configuration.
For the sake of the queries I'll run below I'll only be importing a few choice fields. The space requirements for 1.1 billion records containing the cab_type, passenger_count, total_amount and trip_distance fields will be 102 GB when stored by Elasticsearch.
Benchmarking Queries in Elasticsearch
These queries finished faster than in any other data store I've benchmarked this data against. Unfortunately, because Elasticsearch is only indexing a subsection of the fields I cannot directly compare these results with those I've obtained from other benchmarks. Nonetheless, these query times are very impressive.
Each query was run three times in a row and the lowest query time was taken. The longest query times could be upwards of 100% longer than the shortest.
The following completed in 8.1 seconds.
SELECT cab_type,
count(*)
FROM trips
GROUP BY cab_type
The following completed in 18.18 seconds.
SELECT passenger_count,
avg(total_amount)
FROM trips
GROUP BY passenger_count
The following completed in 24.2 seconds.
SELECT passenger_count,
count(*) trips
FROM trips
GROUP BY passenger_count,
date_histogram(field='pickup_datetime',
'interval'='year',
'alias'='year')
Note: the year will be included in the results after the count, it's just not declared in the select statement due to the way the query grammar is structured in the SQL plugin for Elasticsearch.