Home | Benchmarks | Archives | Atom Feed

Posted on Wed 25 January 2017

1.1 Billion Taxi Rides on kdb+/q & 4 Xeon Phi CPUs

Q is a programming language with a built-in, column-oriented, in-memory and on-disk database called kdb+. Q both includes and extends SQL. Q is native to the database engine so, unlike most databases, there is no shipping of data between the client and the server.

Q and kdb+ are shipped as a single binary that has a small memory footprint and is capable of running in the L2 and L3 caches of modern CPUs making the system very performant. Its tables can be stored on a local disk or distributed but will nonetheless appear as a single table. Table data can be partitioned and segmented into memory mapped files which helps remove I/O bottlenecks.

The earliest version of kdb+ was developed by Kx Systems in the 1990's. The Q language originally grew out of their K programming language which, up until 1998, was exclusively licensed to the Swiss Bank UBS. These days the software is popular with quantitative analysts doing pre- and post-trade analytics and back testing, with stock exchanges doing realtime surveillance for insider trading and compliance purposes and there is a sizeable portion of users using it for realtime streaming analytics such as intra-day tick analysis.

kdb+/q is commercial software but the good thing is that the 32-bit version can be downloaded for free. The x86 version for Linux is a 587 KB binary that can run without any real setup process on most flavours of Linux. There is also support for Solaris, Windows and Mac OSX.

kdb+/q is considered by its users to have good performance attributes. So this is something I wanted to try for myself, which turned out to give some interesting results. In this blog post I'll see how fast kdb+/q can query 1.1 billion taxi rides. I'll be using the same dataset I've used to benchmark Amazon Athena, BigQuery, Elasticsearch, EMR, MapD, PostgreSQL and Redshift. I've compiled a single-page summary of these benchmarks.

The Hardware

For this benchmark I'll be using four CentOS 7.2.1511 servers, each with a Knights Landing-based Intel Xeon Phi 7210 CPU clocked at 1.3 GHz. While the clock speed may not be much to write home about, the chip comes with 64 cores, each with 4 threads. This isn't dissimilar to using graphics cards which often come with relatively slow compute cores but many of them.

The 7210 has a vector processing unit with 512-bit registers bringing its memory bus width in line with those found on high-end GPUs.

As of November 2016, 3 of the top 6 fastest supercomputers in the world are using Xeon Phi chips.

$ lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                256
On-line CPU(s) list:   0-255
Thread(s) per core:    4
Core(s) per socket:    64
Socket(s):             1
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 87
Model name:            Intel(R) Xeon Phi(TM) CPU 7210 @ 1.30GHz
Stepping:              1
CPU MHz:               1008.769
BogoMIPS:              2594.11
L1d cache:             32K
L1i cache:             32K
L2 cache:              1024K
NUMA node0 CPU(s):     0-255
NUMA node1 CPU(s):

For this benchmark I'll put the CPU into performance mode.

$ sudo cpupower frequency-set -g performance
$ sudo cpupower frequency-info
analyzing CPU 0:
  driver: intel_pstate
  CPUs which run at the same hardware frequency: 0
  CPUs which need to have their frequency coordinated by software: 0
  maximum transition latency: 0.97 ms.
  hardware limits: 1000 MHz - 1.50 GHz
  available cpufreq governors: performance, powersave
  current policy: frequency should be within 1000 MHz and 1.50 GHz.
                  The governor "performance" may decide which speed to use
                  within this range.
  current CPU frequency is 1.39 GHz (asserted by call to hardware).
  boost state support:
    Supported: yes
    Active: yes

Each Xeon Phi CPU has 16 GB of on-package, 3D-stacked, high bandwidth memory called Multi-Channel DRAM (MCDRAM). In addition to this there's 96 GB of DDR4 RAM installed as 6 x 16 GB DIMMs on each server. These combine to give a total of 112 GB of addressable, physical memory on each machine.

The MCDRAM operates up to 400 GB/s whereas the DDR4 should be able to reach 90 GB/s.

$ numactl -H | grep size
node 0 size: 98207 MB
node 1 size: 16384 MB
$ cat /proc/meminfo | head -n1
MemTotal:       115392380 kB

In the BIOS I've set the memory mode to flat. Normally, MCDRAM is presented as a cache but in flat mode it becomes part of the virtual memory.

The Xeon Phi processor lets you choose between different non-uniform memory access (NUMA) cluster modes. The three options available are "all to all", "snc4" and "quadrant". For this exercise I've set the cluster mode to quadrant. This means the cores on each CPU are broken up into four groups. Anything stored by a thread on MCDRAM will be on MCDRAM dedicated to that quadrant. This lowers the time it takes to figure out which portion of the 16 GB of MCDRAM a thread likely to find its data stored in.

$ sudo bios-query
BIOS Version............. S72C610.86B.01.01.0208
Cluster Mode:   Quadrant
Memory Mode:    Flat

The drives on each server are Intel Data Center S3710 Series SSDs with 1.2 TB of unformatted capacity. They support up to 85,000 IOPS of random 4K reads and they have a sequential read speed of up to 550 MB/s.

The drives have 500 MB boot and 100 GB swap partitions in addition to the remaining 1,017 GB, xfs-formatted root partitions.

$ lsblk
NAME   MAJ:MIN RM  SIZE RO TYPE MOUNTPOINT
sda      8:0    0  1.1T  0 disk
├─sda1   8:1    0  500M  0 part /boot
├─sda2   8:2    0  100G  0 part [SWAP]
└─sda3   8:3    0 1017G  0 part /

Each server has two Intel I210 gigabit network adaptors.

Each of these servers sits in a sled with the four sleds sat side-by-side in a single, 2U chassis.

kdb+/q Up and Running

If you want to run the 32-bit version of the software you'll most likely need to install the following dependencies on any 64-bit CentOS 7 system:

$ curl -O http://www.mirrorservice.org/sites/dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
$ sudo rpm -Uvh epel-release-6-8.noarch.rpm
$ sudo yum install \
    libstdc++.i686 \
    rlwrap \
    unzip

The following will be needed for a 64-bit Ubuntu 14.04.3 LTS-based system:

$ sudo apt-get install \
    gcc-multilib \
    rlwrap \
    unzip

After that you can open up the download page and get the Linux x86 distribution of kdb+/q.

$ open https://kx.com/download/
$ unzip linuxx86.zip
$ PATH=$PATH:`pwd`/q/l32
$ export PATH

After that you should be able to launch kdb+/q:

$ q
KDB+ 3.4 2016.12.08 Copyright (C) 1993-2016 Kx Systems
l32/ 2()core 3002MB mark ubuntu 127.0.1.1 NONEXPIRE

Welcome to kdb+ 32bit edition
For support please see http://groups.google.com/d/forum/personal-kdbplus
Tutorials can be found at http://code.kx.com/wiki/Tutorials
To exit, type \\
To remove this startup msg, edit q.q

To exit type: \\

q)\\

Kx Systems have a concise reference wiki that I find really helpful.

Loading 1.1 Billion Trips into kdb+/q

The dataset I'm working with was originally put together from various sources with scripts written by Todd W Schneider. In my Billion Taxi Rides in Redshift blog post I exported the data from PostgreSQL into denormalised CSV files and compressed them into 56 gzip files. The gzip files are 104 GB in size and when decompressed take up around 500 GB of space.

There are four servers that will store and process this dataset, they are named kxknl1, kxknl2, kxknl3 and kxknl4. Each server has a copy of the gzip-compressed dataset in a folder called csv.

The load.q script will define a schema that kdb+/q will use. The data types for each column are given as single character representations. Ordered by their first appearance: I is a 32-bit integer, * is an enum, P is a timestamp, B is a boolean, X is a byte, E is a float, H is a 16-bit integer and C is a character.

These letters for each data type in kdb+/q aren't picked at random, they're mnemonic codes: B "boolean", C "char", D "date", E "float", F "double", G "unsigned, 8-bit integer", H "signed, 16-bit integer", I "signed, 32-bit integer", J "signed, 64-bit integer", etc... A full list of supported data types can be seen here.

$ cat load.q
m:(`trip_id              ;"I";
   `vendor_id            ;"*";
   `pickup_datetime      ;"P";
   `dropoff_datetime     ;"P";
   `store_and_fwd_flag   ;"B";
   `rate_code_id         ;"X";
   `pickup_longitude     ;"E";
   `pickup_latitude      ;"E";
   `dropoff_longitude    ;"E";
   `dropoff_latitude     ;"E";
   `passenger_count      ;"X";
   `trip_distance        ;"E";
   `fare_amount          ;"E";
   `extra                ;"E";
   `mta_tax              ;"E";
   `tip_amount           ;"E";
   `tolls_amount         ;"E";
   `ehail_fee            ;"E";
   `improvement_surcharge;"E";
   `total_amount         ;"E";
   `payment_type         ;"*";
   `trip_type            ;"X";
   `pickup               ;"*";
   `dropoff              ;"*";
   `cab_type             ;"*";
   `precipitation        ;"H";
   `snow_depth           ;"H";
   `snowfall             ;"H";
   `max_temperature      ;"H";
   `min_temperature      ;"H";
   `average_wind_speed   ;"H";
   `pickup_nyct2010_gid  ;"H";
   `pickup_ctlabel       ;"*";
   `pickup_borocode      ;"H";
   `pickup_boroname      ;"*";
   `pickup_ct2010        ;"*";
   `pickup_boroct2010    ;"*";
   `pickup_cdeligibil    ;"C";
   `pickup_ntacode       ;"*";
   `pickup_ntaname       ;"*";
   `pickup_puma          ;"*";
   `dropoff_nyct2010_gid ;"H";
   `dropoff_ctlabel      ;"*";
   `dropoff_borocode     ;"H";
   `dropoff_boroname     ;"*";
   `dropoff_ct2010       ;"*";
   `dropoff_boroct2010   ;"*";
   `dropoff_cdeligibil   ;"C";
   `dropoff_ntacode      ;"*";
   `dropoff_ntaname      ;"*";
   `dropoff_puma         ;"*")
@[`.;`c`t;:;flip 0N 2#m];
f:`$":",x:.z.x 0;n:2#-9#x
system"rm -f fifo",n," && mkfifo fifo",n;
system"zcat ",(1_string f)," >fifo",n," &";
td:{` sv`:trips,(`$string x),`trips`}
i:-1;
wr:{td[(`$n),(i+:1),y]set update `p#passenger_count from delete year from select from x where year=y}
rd:.Q.fc[{flip c!(t;",")0:x}];sr:{`year`passenger_count xasc update year:pickup_datetime.year,first each cab_type from x}
\t .Q.fpn[{wr[q]each exec distinct year from q:update `p#year from sr rd x};`$":fifo",n;550000000]

The load.q file will be used by a load.sh script during the loading process (explained below) but first I'll explain the last nine lines of load.q below.

@[`.;`c`t;:;flip 0N 2#m];

The above massages the schema into two variables, c will contain the column names and t will contain the column data types.

f:`$":",x:.z.x 0;n:2#-9#x

The above receives the CSV file as its first argument via ".z.x 0". The variables f will contain the file handle for the CSV file and n will be populated by the last two letters of the CSV base filename (i.e. aa in trips_xaa.csv).

system"rm -f fifo",n," && mkfifo fifo",n;
system"zcat ",(1_string f)," >fifo",n," &";

FIFOs are a feature of Linux that allows a named pipe to be accessed as part of the file system. The above will create a FIFO which will act as a first-in, first-out queue for the decompressed data we will pipe through it. Then zcat will decompress and pipe the decompressed CSV data into the recreated FIFO .

td:{` sv`:trips,(`$string x),`trips`}
i:-1;
wr:{td[(`$n),(i+:1),y]set update `p#passenger_count from delete year from select from x where year=y}
rd:.Q.fc[{flip c!(t;",")0:x}]
sr:{`year`passenger_count xasc update year:pickup_datetime.year,first each cab_type from x}

The above define four functions, td returns the table handle for a given segment/partition, wr that takes an in-memory partition and writes it to disk while indexing the passenger_count column data in the process, rd parses CSV data in parallel into an in-memory table and sr will sort an in-memory table.

\t .Q.fpn[{wr[q]each exec distinct year from q:update `p#year from sr rd x};`$":fifo",n;550000000]

The above both times and triggers the process of reading data from the FIFO, parsing it into an in-memory table, sorting it and writing it out to an on-disk table.

Each of the four servers will have a copy of load.q along side a load.sh script. When executed it will take the last digit of the host name (a value between 1 and 4) and use that to decide which fourth of the 56 gzip files will be loaded onto its own system. This means the dataset will be spread amongst the four machines with none of them holding on to more than 25% of the dataset. It is possible to share the data across all nodes but I chose to not take that approach for this benchmark.

The xargs command will be used to load the 14 files specific to each server concurrently.

Once the data has been loaded in, a partitions manifest file is created and stored in trips/p/par.txt.

$ cat load.sh
#!/bin/sh

id=`hostname | grep -o '.$'`

ls csv/* | \
    awk -v i=$id 'NR > 14 * (i - 1) && NR <= 14 * i' | \
        xargs \
          -P 14 \
          -L 1 \
          q load.q \
            -q \
            -s 8

mkdir -p trips/p

ls -d trips/??/* | \
    sed 's/trips/../' > trips/p/par.txt

The load.sh script was executed concurrently across all four servers. The loading of the CSV data into kdb+'s internal format took about 30 minutes all together. This is one of the fastest load times I've seen of this dataset.

As compression isn't being used for this benchmark, ~125 GB of disk capacity is being used on each machine (excluding source files).

The par.txt file is a list of directories that provides a list of segments that comprise the trips table.

$ head -n20 trips/p/par.txt
../aa/0
../aa/1
../aa/10
../aa/11
../aa/12
../aa/13
../aa/14
../aa/15
../aa/16
../aa/2
../aa/3
../aa/4
../aa/5
../aa/6
../aa/7
../aa/8
../aa/9
../ab/0
../ab/1
../ab/10

So to recap, I've imported the 56 CSV files into 960 partitions. There are a total of 1,024 threads across all four CPUs so there should be a thread for each partition when I run my queries.

Benchmarking kdb+/q

For this benchmark I'll be running the 64-bit commercial version of kdb+/q. The following SQL commands were executed from the kdb+/q binary on the first node. Each server processed 1/4th of each job and the results were returned to the REPL interface the SQL was originally executed from.

Before launching any processes I'll flush the Kernel's buffer cache. This way I can be sure the data isn't pre-loaded before I begin benchmarking.

$ echo 3 | sudo tee /proc/sys/vm/drop_caches

kdb+/q will then be launched on each of the four machines with MCDRAM being the preferred memory to use and the DDR4 RAM as spill-over. The -s 256 flag is referring to the number of slaves kdb+/q will use for parallel execution (on each machine) and the -p 5001 flag is the TCP port number to listen on.

$ sudo numactl --preferred 1 \
    q \
    trips/p \
    -s 256 \
    -p 5001 \
    &> slave.log

The following will launch kdb+/q as a coordinator on kxknl1 and open a connection to all four nodes. The coordinator will execute the SQL queries and pass out the work to all four nodes.

I've prefixed the q binary with rlwrap so that I can have readline history in the REPL that I can cycle through with the arrow keys.

$ rlwrap q startmaster.q -s -1

The above will first execute a startmaster.q script before exposing the REPL.

$ cat startmaster.q
k).Q.p:{$[~#.Q.D;.Q.p2[x;`:.]':y;(,/(,/.Q.p2[x]'/':)':(#.z.pd;0N)#.Q.P[i](;)'y)@<,/y@:i:&0<#:'y:.Q.D{x@&x in y}\:y]}

\l /home/mark/trips/all/p

.z.pd:`u#hopen each `:kxknl1:5001`:kxknl2:5001`:kxknl3:5001`:kxknl4:5001

I chose to set up the DB with one Linux process per KNL node, using many Linux threads of execution within each kdb+ process/slave. As I needed to distribute the query in parallel across all segments and not just across these Linux processes, I asked Kx Systems for the best approach for this. The first line in startmaster.q will instruct kdb+/q to distribute the workload around all the slaves of each of the Linux processes listening on ports as indicated in line 3 of startmaster.q. This makes sure the queries are processed across all segments of the dataset, in parallel. The second line will load the root of the database. The third line will grab the handles for each of the peach threads and open up TCP connections to all four nodes.

The times quoted below are the lowest query times seen during a series of runs. As with all my benchmarks, I use lowest query time as a way of indicating "top speed".

The following completed in 0.051 seconds.

select count 1b by cab_type from trips

The following completed in 0.146 seconds.

select avg total_amount by passenger_count from trips

The following completed in 0.047 seconds.

select count 1b by year, passenger_count from trips

The following completed in 0.794 seconds.

select count 1b by year, passenger_count, floor trip_distance from trips

These query times are by far the fastest I've ever seen on any CPU-based system and the third query finished faster than on any other system I've ever benchmarked before.

It's amazing to see what is possible when data locality is as optimised as it is in this setup. kdb+/q does an amazing job of breaking up the workload amongst the 1,024 CPU threads in the cluster. Having 16 GB of MCDRAM that is just so much faster than regular DDR4 opens up a whole new world of optimisations.

It's also refreshing to not have a large configuration overhead when launching kdb+/q. A binary that just works well right off the bat and can be tweaked with a couple of flags is a welcome relief from hours of tuning desperate configuration files spread all over a cluster.

It's great to see such strong performance from the Xeon Phi CPUs as well. These are early days in the battle between GPUs and CPUs for supremacy in OLAP workloads. It will be interesting to see how it plays out.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.