Home | Benchmarks | Categories | Atom Feed

Posted on Sun 20 October 2019 under ClickHouse

Faster ClickHouse Imports

ClickHouse is the workhorse of many services at Yandex and several other large Internet firms in Russia. These companies serve an audience of 166 million Russian speakers worldwide and have some of the greatest demands for distributed OLAP systems in Europe.

This year has seen good progress in ClickHouse's development and stability. Support has been added for HDFS, ZFS and Btrfs for both reading datasets and storing table data, a T64 codec which can significantly improve ZStandard compression, faster LZ4 performance and tiered storage.

Anyone uncomfortable with the number of moving parts in a typical Hadoop setup might find assurances in ClickHouse as being a single piece of software rather than a loose collection of several different projects. For anyone unwilling to pay for Cloud hosting, ClickHouse can run off a laptop running MacOS; paired with Python and Tableau there would be little reason to connect to the outside world for most analytical operations. Being written in C++ means there are no JVM configurations to consider when running in standalone mode.

ClickHouse relies heavily on 3rd-party libraries which helps keep the C++ code base at ~300K lines of code. To contrast, PostgreSQL's current master branch has about 800K lines of C code and MySQL has 2M lines of C++. There have been 13 developers that have made at least 100 commits to the project this year. PostgreSQL has only had five developers reach the same target, MySQL has only seen two. ClickHouse's engineers have managed to deliver a new release every ten days on average for the past 2.5 years.

In this post I'm going to benchmark several ways of importing data into ClickHouse.

ClickHouse Up & Running

ClickHouse supports clustering but for the sake of simplicity I'll be using a single machine for these benchmarks. The machine in question has an Intel Core i5 4670K clocked at 3.4 GHz, 8 GB of DDR3 RAM, a SanDisk SDSSDHII960G 960 GB SSD drive which is connected via a SATA interface. The machine is running a fresh installation of Ubuntu 16.04.2 LTS and I'll be running version 19.15.3.6 of ClickHouse.

The dataset used in this post is the first 80 million records from my 1.1 Billion Taxi Ride Benchmarks. The steps taken to produce this dataset are described here. The 80 million lines are broken up into 4 files of 20 million lines each. There have been three formats of each file produced: uncompressed CSV totalling 36.4 GB, GZIP-compressed CSV totalling 7.5 GB and Snappy-compressed Parquet format totalling 7.6 GB.

Below I'll install ClickHouse 19.15.3.6, MySQL 5.7.27, PostgreSQL 11.5, OpenJDK and ZooKeeper for Kafka and Pigz, a parallel GZIP implementation.

$ sudo apt-key adv \
    --keyserver hkp://keyserver.ubuntu.com:80 \
    --recv E0C56BD4
$ wget -qO- https://www.postgresql.org/media/keys/ACCC4CF8.asc \
    | sudo apt-key add -
$ echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" \
    | sudo tee /etc/apt/sources.list.d/clickhouse.list
$ echo "deb http://apt.postgresql.org/pub/repos/apt/ xenial-pgdg main" \
    | sudo tee /etc/apt/sources.list.d/pgdg.list
$ sudo apt update
$ sudo apt install \
    clickhouse-client \
    clickhouse-server \
    mysql-server \
    openjdk-8-jre \
    openjdk-8-jdk-headless \
    pigz \
    postgresql \
    postgresql-contrib \
    zookeeperd

I'll be using TCP port 9000 for Hadoop's Name Node later in this post so I'll move ClickHouse's server port from 9000 to 9001.

$ sudo vi /etc/clickhouse-server/config.xml

Below is the line I changed in the file above.

<tcp_port>9001</tcp_port>

I'll then launch the ClickHouse Server.

$ sudo service clickhouse-server start

I'll save the password used to connect to the server in a configuration file along with the non-standard TCP port. This will save me having to specify these each time I connect using the ClickHouse Client.

$ mkdir -p ~/.clickhouse-client
$ vi ~/.clickhouse-client/config.xml
<config>
    <port>9001</port>
    <password>root</password>
</config>

Throughout this post I'll be importing data into a ClickHouse table which uses the "Log" engine to store data in a row-centric fashion.

$ clickhouse-client
CREATE TABLE trips (
    trip_id                 UInt32,
    vendor_id               String,

    pickup_datetime         DateTime,
    dropoff_datetime        Nullable(DateTime),

    store_and_fwd_flag      Nullable(FixedString(1)),
    rate_code_id            Nullable(UInt8),
    pickup_longitude        Nullable(Float64),
    pickup_latitude         Nullable(Float64),
    dropoff_longitude       Nullable(Float64),
    dropoff_latitude        Nullable(Float64),
    passenger_count         Nullable(UInt8),
    trip_distance           Nullable(Float64),
    fare_amount             Nullable(Float32),
    extra                   Nullable(Float32),
    mta_tax                 Nullable(Float32),
    tip_amount              Nullable(Float32),
    tolls_amount            Nullable(Float32),
    ehail_fee               Nullable(Float32),
    improvement_surcharge   Nullable(Float32),
    total_amount            Nullable(Float32),
    payment_type            Nullable(String),
    trip_type               Nullable(UInt8),
    pickup                  Nullable(String),
    dropoff                 Nullable(String),

    cab_type                Nullable(String),

    precipitation           Nullable(Int8),
    snow_depth              Nullable(Int8),
    snowfall                Nullable(Int8),
    max_temperature         Nullable(Int8),
    min_temperature         Nullable(Int8),
    average_wind_speed      Nullable(Int8),

    pickup_nyct2010_gid     Nullable(Int8),
    pickup_ctlabel          Nullable(String),
    pickup_borocode         Nullable(Int8),
    pickup_boroname         Nullable(String),
    pickup_ct2010           Nullable(String),
    pickup_boroct2010       Nullable(String),
    pickup_cdeligibil       Nullable(FixedString(1)),
    pickup_ntacode          Nullable(String),
    pickup_ntaname          Nullable(String),
    pickup_puma             Nullable(String),

    dropoff_nyct2010_gid    Nullable(UInt8),
    dropoff_ctlabel         Nullable(String),
    dropoff_borocode        Nullable(UInt8),
    dropoff_boroname        Nullable(String),
    dropoff_ct2010          Nullable(String),
    dropoff_boroct2010      Nullable(String),
    dropoff_cdeligibil      Nullable(String),
    dropoff_ntacode         Nullable(String),
    dropoff_ntaname         Nullable(String),
    dropoff_puma            Nullable(String)
) ENGINE = Log;

Importing From MySQL

I'll first setup a login and password for MySQL.

$ sudo su
$ mysql -uroot -p
CREATE USER 'mark'@'localhost' IDENTIFIED BY 'test';
GRANT ALL PRIVILEGES ON *.* TO 'mark'@'localhost';
FLUSH PRIVILEGES;
$ exit

Then I'll create a database and a table that will store the 80 million trips dataset.

$ mysql -umark -p
CREATE DATABASE trips;
USE trips

CREATE TABLE trips (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         DATETIME,
    dropoff_datetime        DATETIME,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(18,6),
    fare_amount             DECIMAL(18,6),
    extra                   DECIMAL(18,6),
    mta_tax                 DECIMAL(18,6),
    tip_amount              DECIMAL(18,6),
    tolls_amount            DECIMAL(18,6),
    ehail_fee               DECIMAL(18,6),
    improvement_surcharge   DECIMAL(18,6),
    total_amount            DECIMAL(18,6),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4),

    PRIMARY KEY (trip_id)
);

I'll then import the four CSV files containing the 80 million trips into MySQL. Note I'm sourcing the data from the uncompressed CSVs.

LOAD DATA LOCAL INFILE '/home/mark/000000_0.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';

LOAD DATA LOCAL INFILE '/home/mark/000000_1.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';

LOAD DATA LOCAL INFILE '/home/mark/000000_2.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';

LOAD DATA LOCAL INFILE '/home/mark/000000_3.csv'
INTO TABLE trips
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';

The above took 20 minutes and 23 seconds to complete.

I'll launch ClickHouse's Client, create a table pointing to the trips table on MySQL and then import that data into a ClickHouse table which will store the records using the "Log" engine.

$ clickhouse-client
CREATE DATABASE mysql_db
ENGINE = MySQL('localhost:3306', 'trips', 'mark', 'test');

INSERT INTO trips
SELECT * FROM mysql_db.trips;

The above took 10 minutes and 57 seconds. MySQL's internal format needed 42 GB of space to store the dataset. The dataset is 9.9 GB when kept in ClickHouse's internal Log engine format. During the import I could see ClickHouse using 50% of a CPU core and MySQL needing 2.75 CPU cores of capacity. The disk throughput required never breached more than 20% of what the SSD is capable of.

Importing From JSON

I'll dump the 80 million rows in ClickHouse to JSON.

$ clickhouse-client --query='SELECT * FROM trips FORMAT JSONEachRow' \
    > out.json

The resulting JSON file is 100 GB in size and took 9 minutes and 22 seconds to produce.

Here is a sampling of the first three rows.

$ head -n3 out.json
{"trip_id":471162830,"vendor_id":"CMT","pickup_datetime":"2011-09-27 10:19:11","dropoff_datetime":"2011-09-27 10:24:19","store_and_fwd_flag":"N","rate_code_id":1,"pickup_longitude":-73.994426,"pickup_latitude":40.745873,"dropoff_longitude":-74.005886,"dropoff_latitude":40.745395,"passenger_count":1,"trip_distance":0.8,"fare_amount":4.9,"extra":0,"mta_tax":0.5,"tip_amount":0,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":5.4,"payment_type":"CSH","trip_type":0,"pickup":"1156","dropoff":"2098","cab_type":"yellow","precipitation":0,"snow_depth":0,"snowfall":0,"max_temperature":-6,"min_temperature":-50,"average_wind_speed":16,"pickup_nyct2010_gid":-124,"pickup_ctlabel":"91","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"009100","pickup_boroct2010":"1009100","pickup_cdeligibil":"I","pickup_ntacode":"MN13","pickup_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","pickup_puma":"3807","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"}
{"trip_id":471162831,"vendor_id":"VTS","pickup_datetime":"2011-09-15 17:14:00","dropoff_datetime":"2011-09-15 17:48:00","store_and_fwd_flag":"\u0000","rate_code_id":1,"pickup_longitude":-73.968207,"pickup_latitude":40.752457,"dropoff_longitude":-74.008213,"dropoff_latitude":40.74876,"passenger_count":1,"trip_distance":3.15,"fare_amount":17.7,"extra":1,"mta_tax":0.5,"tip_amount":0,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":19.2,"payment_type":"CSH","trip_type":0,"pickup":"1432","dropoff":"2098","cab_type":"yellow","precipitation":18,"snow_depth":0,"snowfall":0,"max_temperature":-12,"min_temperature":117,"average_wind_speed":21,"pickup_nyct2010_gid":-104,"pickup_ctlabel":"90","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"009000","pickup_boroct2010":"1009000","pickup_cdeligibil":"I","pickup_ntacode":"MN19","pickup_ntaname":"Turtle Bay-East Midtown","pickup_puma":"3808","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"}
{"trip_id":471162832,"vendor_id":"VTS","pickup_datetime":"2011-09-25 12:55:00","dropoff_datetime":"2011-09-25 13:05:00","store_and_fwd_flag":"\u0000","rate_code_id":1,"pickup_longitude":-73.99651,"pickup_latitude":40.72563,"dropoff_longitude":-74.007203,"dropoff_latitude":40.75135,"passenger_count":1,"trip_distance":2.48,"fare_amount":8.5,"extra":0,"mta_tax":0.5,"tip_amount":1,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":10,"payment_type":"CRD","trip_type":0,"pickup":"1471","dropoff":"2098","cab_type":"yellow","precipitation":0,"snow_depth":0,"snowfall":0,"max_temperature":11,"min_temperature":-45,"average_wind_speed":4,"pickup_nyct2010_gid":-65,"pickup_ctlabel":"55.02","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"005502","pickup_boroct2010":"1005502","pickup_cdeligibil":"I","pickup_ntacode":"MN23","pickup_ntaname":"West Village","pickup_puma":"3810","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"}

I'll split the JSON file into four files so that I can attempt parallel imports of the JSON data.

$ split --lines=20000000 out.json

The above operation produced four files of 25 GB each.

$ ls -alht xa*
-rw-rw-r-- 1 mark mark 25G Oct 15 04:13 xad
-rw-rw-r-- 1 mark mark 25G Oct 15 04:09 xac
-rw-rw-r-- 1 mark mark 25G Oct 15 04:06 xab
-rw-rw-r-- 1 mark mark 25G Oct 15 04:02 xaa

I'll truncate the trips table in ClickHouse and then import the above four JSON files sequentially.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in xa*; do
      clickhouse-client \
          --query="INSERT INTO trips FORMAT JSONEachRow" < $FILENAME
  done

The above took 10 minutes and 11 seconds. The CPU usage during the import never breached 50%. The SSD achieved read speeds of 225 MB/s and writes of 40 MB/s.

I'll truncate the trips table and attempt a parallel import to see if I can reduce the import time.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xaa
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xab
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xac
clickhouse-client --query=\"INSERT INTO trips FORMAT JSONEachRow\" < xad
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"

The above attempt with four parallel processes failed with the following error.

Code: 209. DB::NetException: Timeout exceeded while reading from socket ([::1]:9000): while receiving packet from localhost:9000

I'll attempt a parallel import with two processes instead.

$ cat jobs | xargs -n1 -P2 -I% bash -c "%"

The above took 8 minutes and 43 seconds.

Since the SSD is showing such a high read rate I'll compress the JSON files and attempt a sequential import.

$ pigz --keep xa*

9.2 GB of GZIP-compressed JSON files were produced by the above operation. I'll then truncate the trips table and import the GZIP-compressed data.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in xa*.gz; do
      gunzip -c $FILENAME \
          | clickhouse-client \
              --query="INSERT INTO trips FORMAT JSONEachRow"
  done

The above took 8 minutes and 9 seconds.

Importing From PostgreSQL

I'll create credentials in PostgreSQL, a database and then a table which will store the 80 million trips.

$ sudo -u postgres \
    bash -c "psql -c \"CREATE USER mark
                       WITH PASSWORD 'test'
                       SUPERUSER;\""
$ createdb trips
$ psql trips
CREATE TABLE trips (
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(18,6),
    fare_amount             DECIMAL(18,6),
    extra                   DECIMAL(18,6),
    mta_tax                 DECIMAL(18,6),
    tip_amount              DECIMAL(18,6),
    tolls_amount            DECIMAL(18,6),
    ehail_fee               DECIMAL(18,6),
    improvement_surcharge   DECIMAL(18,6),
    total_amount            DECIMAL(18,6),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
);

I will then import the 80 million records from four CSV files sequentially.

\copy trips FROM '000000_0.csv' DELIMITER ',' CSV
\copy trips FROM '000000_1.csv' DELIMITER ',' CSV
\copy trips FROM '000000_2.csv' DELIMITER ',' CSV
\copy trips FROM '000000_3.csv' DELIMITER ',' CSV

The above completed in 11 minutes and 32 seconds. Nether the SSD nor the CPU were at capacity during the above operation. It's worth looking at tuning PostgreSQL, examining alternative import methods and seeing if parallelism could have a strong effect on import times. The resulting dataset is 31 GB in PostgreSQL's internal format.

I'll truncate the trips table in ClickHouse and import the dataset from PostgreSQL using UNIX pipes to deliver CSV data to ClickHouse.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ psql trips -c "COPY trips TO STDOUT WITH CSV" \
    | clickhouse-client --query="INSERT INTO trips FORMAT CSV"

The above took 9 minutes and 39 seconds. The CPU showed 70% of capacity being utilised while the SSD showed peaks of 60 MB/s being read and 120 MB/s being written at any one time.

Importing From Kafka

I'll install Kafka manually using the binary package distributed by one of Apache's mirrors.

$ sudo mkdir -p /opt/kafka
$ wget -c -O kafka.tgz \
      http://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
$ sudo tar xzvf kafka.tgz \
      --directory=/opt/kafka \
      --strip 1

I'll then create a log file for Kafka which will be owned by my UNIX account.

$ sudo touch /var/log/kafka.log
$ sudo chown mark /var/log/kafka.log

I'll then launch Kafka's server process.

$ sudo nohup /opt/kafka/bin/kafka-server-start.sh \
             /opt/kafka/config/server.properties \
             > /var/log/kafka.log 2>&1 &

I'll create a trips topic in Kafka.

$ /opt/kafka/bin/kafka-topics.sh \
      --create \
      --zookeeper localhost:2181 \
      --replication-factor 1 \
      --partitions 1 \
      --topic trips

I'll then import the CSV data into the trips topic in Kafka.

$ cat 000000_[0-3].csv \
      | /opt/kafka/bin/kafka-console-producer.sh \
          --topic trips \
          --broker-list localhost:9092 \
      1>/dev/null

The above took 8 minutes and 30 seconds. During that time the CPU was at 90% of capacity and the SSD achieved read and write speeds between 70 - 100 MB/s at any one time. The resulting dataset is 36 GB in Kafka's internal format.

I'll truncate the trips table in ClickHouse, then create a table in ClickHouse which points to the trips Kafka topic and then import its contents into the trips table in ClickHouse.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ clickhouse-client
CREATE TABLE trips_kafka_csv (
    trip_id                 UInt32,
    vendor_id               String,

    pickup_datetime         DateTime,
    dropoff_datetime        Nullable(DateTime),

    store_and_fwd_flag      Nullable(FixedString(1)),
    rate_code_id            Nullable(UInt8),
    pickup_longitude        Nullable(Float64),
    pickup_latitude         Nullable(Float64),
    dropoff_longitude       Nullable(Float64),
    dropoff_latitude        Nullable(Float64),
    passenger_count         Nullable(UInt8),
    trip_distance           Nullable(Float64),
    fare_amount             Nullable(Float32),
    extra                   Nullable(Float32),
    mta_tax                 Nullable(Float32),
    tip_amount              Nullable(Float32),
    tolls_amount            Nullable(Float32),
    ehail_fee               Nullable(Float32),
    improvement_surcharge   Nullable(Float32),
    total_amount            Nullable(Float32),
    payment_type            Nullable(String),
    trip_type               Nullable(UInt8),
    pickup                  Nullable(String),
    dropoff                 Nullable(String),

    cab_type                Nullable(String),

    precipitation           Nullable(Int8),
    snow_depth              Nullable(Int8),
    snowfall                Nullable(Int8),
    max_temperature         Nullable(Int8),
    min_temperature         Nullable(Int8),
    average_wind_speed      Nullable(Int8),

    pickup_nyct2010_gid     Nullable(Int8),
    pickup_ctlabel          Nullable(String),
    pickup_borocode         Nullable(Int8),
    pickup_boroname         Nullable(String),
    pickup_ct2010           Nullable(String),
    pickup_boroct2010       Nullable(String),
    pickup_cdeligibil       Nullable(FixedString(1)),
    pickup_ntacode          Nullable(String),
    pickup_ntaname          Nullable(String),
    pickup_puma             Nullable(String),

    dropoff_nyct2010_gid    Nullable(UInt8),
    dropoff_ctlabel         Nullable(String),
    dropoff_borocode        Nullable(UInt8),
    dropoff_boroname        Nullable(String),
    dropoff_ct2010          Nullable(String),
    dropoff_boroct2010      Nullable(String),
    dropoff_cdeligibil      Nullable(String),
    dropoff_ntacode         Nullable(String),
    dropoff_ntaname         Nullable(String),
    dropoff_puma            Nullable(String)
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'trips',
                            kafka_group_name = 'group1',
                            kafka_format = 'CSV',
                            kafka_num_consumers = 4;

INSERT INTO trips
SELECT * FROM trips_kafka_csv;

During the above operation the CPU was at 50% capacity and there was almost no disk activity to speak of. The progress indicator showed 3,170 rows per second being imported into ClickHouse.

→ Progress: 5.67 million rows, 3.33 GB (3.17 thousand rows/s., 1.86 MB/s.)

This is a capture from top during the above operation.

top - 02:53:24 up  1:30,  3 users,  load average: 0.99, 2.14, 1.75
Tasks: 206 total,   2 running, 204 sleeping,   0 stopped,   0 zombie
%Cpu0  :  0.4 us,  0.0 sy,  0.0 ni, 99.6 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu1  :  1.6 us,  0.4 sy,  0.0 ni, 98.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu2  :  8.3 us, 91.7 sy,  0.0 ni,  0.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu3  :  0.4 us,  0.4 sy,  0.0 ni, 99.3 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 24.3/8157556  [|||||||||||||||||||||||                                                                      ]
KiB Swap:  1.2/8385532  [|                                                                                            ]

   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
 11422 clickho+  20   0 5544596 417140  29588 S 150.2  5.1   3:24.86 /usr/bin/clickhouse-server
 30574 root      20   0 5730472 998060  10408 S   0.7 12.2   8:58.01 java -Xmx1G -Xms1G -server

Had I allowed the operation to continue it would have taken seven hours to complete. I haven't come across any obvious parameter tuning in the ClickHouse manual at this point so I'll have to see if this can be addressed in a later post.

Traditionally I'd use Flume to land Kafka streams onto HDFS and then run periodic jobs via Airflow to import time-partitioned datasets into OLAP systems similar to ClickHouse. At some point when I've uncovered the bottleneck above I'll aim to publish a comparison of these methods.

Importing CSV

I'll import four GZIP-compressed CSV files sequentially using gunzip to decompress the contents before presenting them to ClickHouse via a UNIX pipe. Each file is slightly less than 1,929 MB in size when compressed.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in *.csv.gz; do
      gunzip -c $FILENAME \
          | clickhouse-client \
              --query="INSERT INTO trips FORMAT CSV"
  done

The above completed in 4 minutes and 45 seconds.

I'll perform the same import but use four parallel processes.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
gunzip -c 000000_0.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
gunzip -c 000000_1.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
gunzip -c 000000_2.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
gunzip -c 000000_3.csv.gz | clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\"
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"

The above completed in 4 minutes and 39 seconds. Barely any improvement at all.

I'll see if using data that is already decompressed speeds up the import process. Below I'll import four decompressed files sequentially.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in *.csv; do
      clickhouse-client \
          --query="INSERT INTO trips FORMAT CSV" < $FILENAME
  done

The above completed in 5 minutes and 59 seconds.

I'll try the same operation with four parallel processes.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_0.csv
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_1.csv
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_2.csv
clickhouse-client --query=\"INSERT INTO trips FORMAT CSV\" < 000000_3.csv
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"

The above completed in 6 minutes and 6 seconds. It appears that parallel imports, regardless of file compression, has little or even a negative impact on import times. The increased amount of data being read off of disk appears to have a negative impact on importing versus simply reading compressed data off disk and decompressing it just prior to import.

Importing Parquet

The Parquet dataset is Snappy-compressed and is made up of four files that are ~1,983 MB each in size.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ for FILENAME in *.pq; do
      cat $FILENAME \
          | clickhouse-client \
              --query="INSERT INTO trips FORMAT Parquet"
  done

The above completed in 2 minutes and 38 seconds.

I'll attempt the same operation again but using four parallel processes to import the data.

$ clickhouse-client --query="TRUNCATE TABLE trips"
$ vi jobs
cat 000000_0.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
cat 000000_1.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
cat 000000_2.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
cat 000000_3.pq | clickhouse-client --query=\"INSERT INTO trips FORMAT Parquet\"
$ cat jobs | xargs -n1 -P4 -I% bash -c "%"

The above completed in 2 minutes and 48 seconds. Much like what was seen with CSV data, parallel imports didn't have a positive impact on total import time.

Importing From HDFS

I've installed HDFS via the Hadoop distribution provided by Apache. The installation steps I took can be found in my Hadoop 3 Installation Guide. Do note I didn't install Hive, Presto nor Spark for this exercise as they're not needed.

I'll copy the decompressed CSV files onto HDFS.

$ hdfs dfs -mkdir -p /csv
$ hdfs dfs -copyFromLocal ~/*.csv /csv/

I'll then connect to ClickHouse and create a table representing the CSV data on HDFS.

$ clickhouse-client
CREATE TABLE trips_hdfs_csv (
    trip_id                 UInt32,
    vendor_id               String,

    pickup_datetime         DateTime,
    dropoff_datetime        Nullable(DateTime),

    store_and_fwd_flag      Nullable(FixedString(1)),
    rate_code_id            Nullable(UInt8),
    pickup_longitude        Nullable(Float64),
    pickup_latitude         Nullable(Float64),
    dropoff_longitude       Nullable(Float64),
    dropoff_latitude        Nullable(Float64),
    passenger_count         Nullable(UInt8),
    trip_distance           Nullable(Float64),
    fare_amount             Nullable(Float32),
    extra                   Nullable(Float32),
    mta_tax                 Nullable(Float32),
    tip_amount              Nullable(Float32),
    tolls_amount            Nullable(Float32),
    ehail_fee               Nullable(Float32),
    improvement_surcharge   Nullable(Float32),
    total_amount            Nullable(Float32),
    payment_type            Nullable(String),
    trip_type               Nullable(UInt8),
    pickup                  Nullable(String),
    dropoff                 Nullable(String),

    cab_type                Nullable(String),

    precipitation           Nullable(Int8),
    snow_depth              Nullable(Int8),
    snowfall                Nullable(Int8),
    max_temperature         Nullable(Int8),
    min_temperature         Nullable(Int8),
    average_wind_speed      Nullable(Int8),

    pickup_nyct2010_gid     Nullable(Int8),
    pickup_ctlabel          Nullable(String),
    pickup_borocode         Nullable(Int8),
    pickup_boroname         Nullable(String),
    pickup_ct2010           Nullable(String),
    pickup_boroct2010       Nullable(String),
    pickup_cdeligibil       Nullable(FixedString(1)),
    pickup_ntacode          Nullable(String),
    pickup_ntaname          Nullable(String),
    pickup_puma             Nullable(String),

    dropoff_nyct2010_gid    Nullable(UInt8),
    dropoff_ctlabel         Nullable(String),
    dropoff_borocode        Nullable(UInt8),
    dropoff_boroname        Nullable(String),
    dropoff_ct2010          Nullable(String),
    dropoff_boroct2010      Nullable(String),
    dropoff_cdeligibil      Nullable(String),
    dropoff_ntacode         Nullable(String),
    dropoff_ntaname         Nullable(String),
    dropoff_puma            Nullable(String)
) ENGINE = HDFS('hdfs://localhost:9000/csv/*', 'CSV');

I'll then truncate the trips table and import the data off HDFS into a native ClickHouse table.

TRUNCATE TABLE trips;

INSERT INTO trips
SELECT * FROM trips_hdfs_csv;

The above completed in 3 minutes and 11 seconds. During the import the CPU was almost completely utilised. ClickHouse had almost three cores saturated while the HDFS DataNode took half a core to itself.

top - 07:13:06 up 30 min,  2 users,  load average: 6.72, 3.33, 1.76
Tasks: 201 total,   3 running, 198 sleeping,   0 stopped,   0 zombie
%Cpu0  : 78.0 us, 15.7 sy,  0.0 ni,  1.0 id,  3.5 wa,  0.0 hi,  1.7 si,  0.0 st
%Cpu1  : 40.0 us, 29.1 sy,  0.0 ni,  1.1 id,  8.7 wa,  0.0 hi, 21.1 si,  0.0 st
%Cpu2  : 57.6 us, 20.9 sy,  0.0 ni, 10.4 id,  9.7 wa,  0.0 hi,  1.4 si,  0.0 st
%Cpu3  : 70.3 us, 15.9 sy,  0.0 ni,  5.1 id,  5.8 wa,  0.0 hi,  2.9 si,  0.0 st
KiB Mem :  8157556 total,   149076 free,  2358936 used,  5649544 buff/cache
KiB Swap:  8386556 total,  8298996 free,    87560 used.  5465288 avail Mem

   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
  1102 clickho+  20   0 5583684 1.446g  74952 S 289.0 18.6   4:39.64 /usr/bin/clickhouse-server
  1894 root      20   0 3836828 276156  24244 S   0.3  3.4   0:09.45 /usr/bin/java -Dproc_namenode
  2059 root      20   0 3823000 268328  24388 S  47.2  3.3   3:28.72 /usr/bin/java -Dproc_datanode

Given the large amount of memory allocated towards the buffer / cache I suspect HDFS has done a good job of getting Linux to keep as many HDFS blocks as possible in the Page Cache.

I'll perform the same import off of HDFS but this time I'll use a Parquet-formatted, Snappy-compressed copy of the dataset. Below I'll copy the Parquet files onto HDFS.

$ hdfs dfs -mkdir -p /pq
$ hdfs dfs -copyFromLocal ~/*.pq /pq/

I'll then create a table in ClickHouse to represent that dataset as a table.

$ clickhouse-client
CREATE TABLE trips_hdfs_pq (
    trip_id                 UInt32,
    vendor_id               String,

    pickup_datetime         DateTime,
    dropoff_datetime        Nullable(DateTime),

    store_and_fwd_flag      Nullable(FixedString(1)),
    rate_code_id            Nullable(UInt8),
    pickup_longitude        Nullable(Float64),
    pickup_latitude         Nullable(Float64),
    dropoff_longitude       Nullable(Float64),
    dropoff_latitude        Nullable(Float64),
    passenger_count         Nullable(UInt8),
    trip_distance           Nullable(Float64),
    fare_amount             Nullable(Float32),
    extra                   Nullable(Float32),
    mta_tax                 Nullable(Float32),
    tip_amount              Nullable(Float32),
    tolls_amount            Nullable(Float32),
    ehail_fee               Nullable(Float32),
    improvement_surcharge   Nullable(Float32),
    total_amount            Nullable(Float32),
    payment_type            Nullable(String),
    trip_type               Nullable(UInt8),
    pickup                  Nullable(String),
    dropoff                 Nullable(String),

    cab_type                Nullable(String),

    precipitation           Nullable(Int8),
    snow_depth              Nullable(Int8),
    snowfall                Nullable(Int8),
    max_temperature         Nullable(Int8),
    min_temperature         Nullable(Int8),
    average_wind_speed      Nullable(Int8),

    pickup_nyct2010_gid     Nullable(Int8),
    pickup_ctlabel          Nullable(String),
    pickup_borocode         Nullable(Int8),
    pickup_boroname         Nullable(String),
    pickup_ct2010           Nullable(String),
    pickup_boroct2010       Nullable(String),
    pickup_cdeligibil       Nullable(FixedString(1)),
    pickup_ntacode          Nullable(String),
    pickup_ntaname          Nullable(String),
    pickup_puma             Nullable(String),

    dropoff_nyct2010_gid    Nullable(UInt8),
    dropoff_ctlabel         Nullable(String),
    dropoff_borocode        Nullable(UInt8),
    dropoff_boroname        Nullable(String),
    dropoff_ct2010          Nullable(String),
    dropoff_boroct2010      Nullable(String),
    dropoff_cdeligibil      Nullable(String),
    dropoff_ntacode         Nullable(String),
    dropoff_ntaname         Nullable(String),
    dropoff_puma            Nullable(String)
) ENGINE = HDFS('hdfs://localhost:9000/pq/*', 'Parquet');

I'll then truncate the trips table and import the data off HDFS into a native ClickHouse table.

TRUNCATE TABLE trips;

INSERT INTO trips
SELECT * FROM trips_hdfs_pq;

The above operation failed stating I'd hit a single-query memory limit for ClickHouse

Received exception from server (version 19.15.3):
Code: 241. DB::Exception: Received from localhost:9001. DB::Exception: Memory limit (for query) exceeded: would use 10.63 GiB (attempt to allocate chunk of 2684354560 bytes), maximum: 9.31 GiB.

I had to upgrade the RAM on the system from 8 GB to 20 GB and set the maximum memory limit to an arbitrarily high number.

TRUNCATE TABLE trips;

SET max_memory_usage = 20000000000;

INSERT INTO trips
SELECT * FROM trips_hdfs_pq;

The above completed in 1 minute and 56 seconds. The fastest of any method I've attempted. With the above the CPU was almost fully utilised, the SSD was nearing its throughput limits and almost the entirety of the RAM available to the system was in use.

Conclusion

The format in which data is presented and the disk system it is presented with to ClickHouse can have a huge impact on import times. Importing compressed JSON off of disk was 7.3x slower than loading Parquet off of HDFS. Parallelism doesn't seem to have any benefits either.

It was great to see how fast Parquet data would load off of HDFS but it's a shame to see the RAM-intensity of this operation. Every other import ran fine with no more than 8 GB of system memory. A ratio of 8 GB of RAM per CPU core appears to be the sweet spot for this particular workload.

Below is a summary, sorted from fastest to slowest, of the above imports.

Seconds Method
116 Snappy-compressed Parquet off HDFS (sequential)
158 Snappy-compressed Parquet off disk (sequential)
168 Snappy-compressed Parquet off disk (parallel x 4)
191 Uncompressed CSV off HDFS (sequential)
278 GZIP-compressed CSV off disk (parallel x 4)
285 GZIP-compressed CSV off disk (sequential)
358 Uncompressed CSV off disk (sequential)
366 Uncompressed CSV off disk (parallel x 4)
522 Uncompressed JSON off disk (parallel x 2)
579 PostgreSQL
610 Uncompressed JSON off disk (sequential)
657 MySQL
849 GZIP-compressed JSON off disk (sequential)

Another interesting highlight was the time it took to get the dataset into PostgreSQL, MySQL and Kafka sans any indexing.

Seconds Method
358 Uncompressed CSV into ClickHouse
510 Uncompressed CSV into Kafka
692 Uncompressed CSV into PostgreSQL
1223 Uncompressed CSV into MySQL

The above could probably be improved upon but nonetheless I'm surprised to see such a delta with bulk importing into row-centric storage.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2019 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.