Home | Benchmarks | Archives | Atom Feed

Posted on Mon 19 March 2018

Hadoop 3 Single-Node Install Guide

Hadoop 3 was released in December 2017. It's a major release with a number of interesting new features. It's early days but I've found so far in my testing it hasn't broken too many of the features or processes I commonly use day to day in my 2.x installations. Again, early days but I'm happy so far with what the Hadoop community has delivered.

For anyone that doesn't know Hadoop's back story, it's open source software that lets you work with petabytes of data across 1,000s of cheap computers. It's used by the likes of Amazon, Apple, Facebook as well as many Banks, Governments and Telecommunications firms. If an enterprise is data-driven they most likely have at least one Hadoop deployment.

This post is meant to help people explore Hadoop 3 without feeling the need they should be using 50+ machines to do so. I'll be using a fresh installation of Ubuntu 16.04.2 LTS on a single computer. The machine has an Intel Core i5-7300HQ CPU clocked at 2.50GHz, 8 GB of RAM and a 200 GB mechanical disk drive. I intentionally picked a low end machine to demonstrate not much is needed to try out Hadoop in a learning exercise.

Please do be mindful these instructions are aimed at building a test environment that is cut off from the outside world. Beyond the fact this is a single machine installation for software which is meant to run on multiple machines there would need to be significant content changes to turn these instructions into production installation notes.

Installing Key Prerequisites

I'll first install SSH, MySQL and a few software installation utilities for Java 8.

$ sudo apt install \
    openssh-server \
    software-properties-common \
    python-software-properties \

I'll then install Oracle's Java 8 distribution.

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt update
$ sudo apt install oracle-java8-installer

I'll setup access for Hive to use MySQL for its metadata backend. I'll be using root's shell for this.

When you launch MySQL you'll be prompted for the MySQL root password you entered during installation of that software. If you didn't set a password then just hit enter.

$ sudo su
$ mysql -uroot -p

The following creates a hive account with the password hive. Please do feel free to pick credentials that don't match this guide's choices.

CREATE USER 'hive'@'localhost' IDENTIFIED BY 'hive';
GRANT ALL PRIVILEGES ON *.* TO 'hive'@'localhost';

By default Hadoop starts and stops services with all nodes via SSH even if it's a single-node installation. For this reason I'll setup SSH access via keys to avoid password prompts.

$ ssh-keygen
$ cp /root/.ssh/id_rsa.pub \

The following will test that SSH connects without issue and can execute an uptime command.

$ ssh localhost uptime

With that in place I'll exit root's shell and return to my regular shell.

$ exit

Centralising Hadoop & Spark Settings

There are environment settings that will be used by Hadoop, Hive and Spark and shared by both root and the regular user accounts. To centralise these settings I've stored them in /etc/profile and created a symbolic link from /root/.bashrc to this file as well. That way all users will have centrally-managed settings.

$ sudo vi /etc/profile
if [ "$PS1" ]; then
  if [ "$BASH" ] && [ "$BASH" != "/bin/sh" ]; then
    # The file bash.bashrc already sets the default PS1.
    # PS1='\h:\w\$ '
    if [ -f /etc/bash.bashrc ]; then
      . /etc/bash.bashrc
    if [ "`id -u`" -eq 0 ]; then
      PS1='# '
      PS1='$ '

if [ -d /etc/profile.d ]; then
  for i in /etc/profile.d/*.sh; do
    if [ -r $i ]; then
      . $i
  unset i

export HADOOP_HOME=/opt/hadoop
export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:/opt/hive/bin:/opt/spark/bin:/opt/presto/bin
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export SPARK_HOME=/opt/spark
export SPARK_CONF_DIR=/opt/spark/conf
export SPARK_MASTER_HOST=localhost
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
$ sudo ln -sf /etc/profile \
$ source /etc/profile

Downloading Hadoop, Hive, Spark & Presto

I'm going to install all the software under the /opt directory and store HDFS' underlying data there as well. Below will create the folders with a single command.

$ sudo mkdir -p /opt/{hadoop,hdfs/{datanode,namenode},hive,presto/{etc/catalog,data},spark}

The layout of the folders looks like the following.

├── hadoop
├── hdfs
│   ├── datanode
│   └── namenode
├── hive
├── presto
│   ├── data
│   └── etc
│       └── catalog
└── spark

The following downloads Hadoop, Hive, Spark & Presto.

$ DIST=http://www-eu.apache.org/dist

$ wget -c -O hadoop.tar.gz  $DIST/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
$ wget -c -O hive.tar.gz    $DIST/hive/hive-2.3.2/apache-hive-2.3.2-bin.tar.gz
$ wget -c -O spark.tgz      $DIST/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
$ wget -c -O presto.tar.gz  https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.196/presto-server-0.196.tar.gz
$ wget -c -O presto-cli.jar https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.196/presto-cli-0.196-executable.jar

The binary release of Hadoop 3.0.0 is 293 MB compressed. Its decompressed size is 733 MB with 400 MB of small documentation files that take a long time to decompress. For this reason I'll skip these files.

Hive has a large number of unit test files which are excluded from decompression as well.

$ sudo tar xvf hadoop.tar.gz \
      --directory=/opt/hadoop \
      --exclude=hadoop-3.0.0/share/doc \
      --strip 1

$ sudo tar xvf hive.tar.gz \
      --directory=/opt/hive \
      --exclude=apache-hive-2.3.2-bin/ql/src/test \
      --strip 1

$ sudo tar xzvf spark.tgz \
    --directory=/opt/spark \
    --strip 1

$ sudo tar xvf presto.tar.gz \
    --directory=/opt/presto \
    --strip 1

I'll move Presto's CLI into its binary folder and make sure it's executable.

$ sudo mv presto-cli.jar /opt/presto/bin/presto
$ sudo chmod +x /opt/presto/bin/presto

Configuring Hadoop

This will be a single machine installation so the master and slave nodes list for Hadoop will just be localhost.

$ sudo vi /opt/hadoop/etc/hadoop/master
$ sudo vi /opt/hadoop/etc/hadoop/slaves

I'll discuss HDFS in greater detail later in this blog post but essentially it is the file system you most commonly use with Hadoop when not working in the Cloud.

Below I'll create two configuration files with overrides needed for HDFS. I'll be setting a default replication factor to one as I only have a single hard drive to store data on.

$ sudo vi /opt/hadoop/etc/hadoop/core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
$ sudo vi /opt/hadoop/etc/hadoop/hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

Hadoop has hundreds of settings for many of its services. The XML configuration files allow you to just state which default settings you want to change which helps keep these files shorter than they'd otherwise be.

HDFS: Hadoop's Distributed File System

HDFS is Hadoop's distributed file system and one of its greatest features. Among many other things, it helps get data closer to all the machines computing said data.

With HDFS replication if you have 20 machines wanting to work with one file and that file only lives on a single machine there will be a bottleneck. Now if you replicate that file on to three HDFS machines then you'd cut down the backlog by a 3rd.

Hadoop was originally designed to work on cheap, potentially unreliable mechanical disks that could corrupt data or suffer other issues. If some data was unavailable or corrupted on one HDFS node's disk other machines might have a good copy of the data. Prior to Hadoop 3 the default replication setting was three copies. In Hadoop 3 the default is to use Erasure Coding which will only need to use 1.6x the file's size in order to better ensure the data is available.

I recommend changing the replication settings listed in this blog post and explore Erasure Coding.

Setting up HDFS

Below I'll switch to root's shell, format the HDFS file system and start the HDFS service.

$ sudo su
$ hdfs namenode -format
$ start-dfs.sh

I'll grant my mark UNIX account access to the whole file system as well. Be sure to replace mark with your UNIX account name.

$ hdfs dfs -chown mark /

I'll then check the disk capacity available to HDFS looks sensible.

$ hdfs dfsadmin -report \
    | grep 'Configured Capacity' \
    | tail -n1
Configured Capacity: 203795914752 (189.80 GB)

With that all finished I'll exit root's shell.

$ exit

Setting up Hive

Hive, among other things, describes the table schemas of the files being stored on HDFS to many pieces of software in the Hadoop Ecosystem. It can also be used to convert data from one format to another. It is very much a Swiss Army Knife.

The key configuration being set below tell Hive how to speak to MySQL. MySQL holds the underlying metadata Hive generates. Be mindful there is a username and password for accessing MySQL in the middle of the XML.

$ sudo vi /opt/hive/conf/hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

Hive will also need a connector for MySQL. The following will download one and store it in Hive's libraries folder.

$ sudo wget \
    -c http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.28/mysql-connector-java-5.1.28.jar \
    -P /opt/hive/lib/

I'll initialise the MySQL tables Hive will use and launch it's Metastore service.

$ schematool -dbType mysql -initSchema
$ hive --service metastore &

Spark will need access to Hive's configuration file and the MySQL connector so I'll create symbolic links to keep them centralised.

$ sudo ln -s /opt/hive/conf/hive-site.xml \

$ sudo ln -s /opt/hive/lib/mysql-connector-java-5.1.28.jar \

Setting up Spark

When Spark launches jobs it transfers its jar files to HDFS so they're available to any machines working on said job. These files are a large overhead on smaller jobs so I've packaged them up, copied them to HDFS and told Spark it doesn't need to copy them over any more.

$ jar cv0f ~/spark-libs.jar -C /opt/spark/jars/ .
$ hdfs dfs -mkdir /spark-libs
$ hdfs dfs -put ~/spark-libs.jar /spark-libs/
$ sudo vi /opt/spark/conf/spark-defaults.conf
spark.master spark://localhost:7077
spark.yarn.preserve.staging.files true
spark.yarn.archive hdfs:///spark-libs/spark-libs.jar

The machine I'm using doesn't have a lot of memory so I've set memory limits for Spark pretty low. If you have more memory available on your system feel free to increase these figures.

$ sudo vi /opt/spark/conf/spark-env.sh

The following will configure Spark to only use localhost for processing jobs.

$ sudo vi /opt/spark/conf/slaves

The following will launch Spark's services.

$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh

Importing Data into Hadoop

Each tool in the Hadoop Ecosystem has certain file formats it prefers that will outperform others. Each piece of software tends also to use their own code for reading each format so finding the fastest format to use for your data often requires knowing which software package(s) you'll be working with.

This is also an area that changes a lot with just about every major release of many tools. Spark 2.3 added a vectorised ORC file reader they claim is 2-5x quicker than their previous reader code and this is a project that's had code committed to it eight years ago. The search for more performance is never ending.

With all that said, if data is stored in a columnar fashion it'll lend itself to being quicker to run analytics on. Below I'll be converting CSV data into ORC format. CSV data is stored in a row-centric form while ORC stores data in a columnar fashion.

Below I'll import the CSV file off the regular file system onto HDFS. The file has 20 million records and 51 columns. It is one of 56 files that makes up the dataset for the 1.1 Billion Taxi Rides Benchmarks I run.

But before I can store the CSV file in the /trips_csv folder on HDFS I need to first create the folder or I'll get an error message.

$ hdfs dfs -mkdir /trips_csv
$ hdfs dfs -copyFromLocal \
    ~/trips/trips_xaa.csv.gz \

I can now check the health telemetry on the folder the file sits in.

$ hdfs fsck /trips_csv/
 Number of data-nodes:  1
 Number of racks:               1
 Total dirs:                    1
 Total symlinks:                0

Replicated Blocks:
 Total size:    2024092233 B
 Total files:   1
 Total blocks (validated):      16 (avg. block size 126505764 B)
 Minimally replicated blocks:   16 (100.0 %)
 Over-replicated blocks:        0 (0.0 %)
 Under-replicated blocks:       0 (0.0 %)
 Mis-replicated blocks:         0 (0.0 %)
 Default replication factor:    1
 Average block replication:     1.0
 Missing blocks:                0
 Corrupt blocks:                0
 Missing replicas:              0 (0.0 %)

Erasure Coded Block Groups:
 Total size:    0 B
 Total files:   0
 Total block groups (validated):        0
 Minimally erasure-coded block groups:  0
 Over-erasure-coded block groups:       0
 Under-erasure-coded block groups:      0
 Unsatisfactory placement block groups: 0
 Average block group size:      0.0
 Missing block groups:          0
 Corrupt block groups:          0
 Missing internal blocks:       0

Converting CSV Files to ORC Format

Hadoop can work with pretty much any sort of file type but for certain tasks like aggregation there are file formats that are optimised for the task. The Taxi data imported onto HDFS is in CSV format but Spark and Presto can analyse the data quicker if it's in ORC format first. Below I'll use Hive for this conversion.

The GZIP-compressed CSV file already exists on HDFS in the /trips_csv/ folder so I'll create a table schema pointing to that folder. Any files in there will have their contents exposed as database table data. Hive will want me to address the data as a table name rather than an HDFS file location when I do the conversion.

An HDFS folder should only ever contain files that match in terms of format and schema if you're working with structured data. Mixing ORC and CSV files in the same folder will result in errors from just about every piece of software that can read structured and semi-structured data off HDFS. If you do have two copies of a dataset in different formats then they need to live in separate folders in order not to confuse any software trying to read each copy.

$ hive
    trip_id                 INT,
    vendor_id               VARCHAR(3),
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      VARCHAR(1),
    rate_code_id            SMALLINT,
    pickup_longitude        DECIMAL(18,14),
    pickup_latitude         DECIMAL(18,14),
    dropoff_longitude       DECIMAL(18,14),
    dropoff_latitude        DECIMAL(18,14),
    passenger_count         SMALLINT,
    trip_distance           DECIMAL(6,3),
    fare_amount             DECIMAL(6,2),
    extra                   DECIMAL(6,2),
    mta_tax                 DECIMAL(6,2),
    tip_amount              DECIMAL(6,2),
    tolls_amount            DECIMAL(6,2),
    ehail_fee               DECIMAL(6,2),
    improvement_surcharge   DECIMAL(6,2),
    total_amount            DECIMAL(6,2),
    payment_type            VARCHAR(3),
    trip_type               SMALLINT,
    pickup                  VARCHAR(50),
    dropoff                 VARCHAR(50),

    cab_type                VARCHAR(6),

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          VARCHAR(10),
    pickup_borocode         SMALLINT,
    pickup_boroname         VARCHAR(13),
    pickup_ct2010           VARCHAR(6),
    pickup_boroct2010       VARCHAR(7),
    pickup_cdeligibil       VARCHAR(1),
    pickup_ntacode          VARCHAR(4),
    pickup_ntaname          VARCHAR(56),
    pickup_puma             VARCHAR(4),

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         VARCHAR(10),
    dropoff_borocode        SMALLINT,
    dropoff_boroname        VARCHAR(13),
    dropoff_ct2010          VARCHAR(6),
    dropoff_boroct2010      VARCHAR(7),
    dropoff_cdeligibil      VARCHAR(1),
    dropoff_ntacode         VARCHAR(4),
    dropoff_ntaname         VARCHAR(56),
    dropoff_puma            VARCHAR(4)
  LOCATION '/trips_csv/';

Below I'll define a new table trips_orc and a folder on HDFS called /trips_orc/ that hasn't yet been created. The schema for trips_orc matches the CSV file's schema.

The trips_orc table has an instruction at the bottom to store it's contents in ORC format. I can then run an INSERT INTO / SELECT query in Hive and it will convert the CSV data from the /trips_csv/ folder into ORC format and store it in the /trips_orc/ folder on HDFS.

The trips_orc table will then be available for SQL queries and I can run SELECT COUNT(*) FROM trips_csv or SELECT COUNT(*) FROM trips_orc and the respective datasets will be queried individually.

    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
  LOCATION '/trips_orc/';

The following will convert the CSV data into ORC. If it is run more than once the data will be duplicated. This can be handy if you want to generate more data when benchmarking.

INSERT INTO trips_orc
SELECT * FROM trips_csv;

Also, any new files stored in an HDFS folder will be available automatically when Hive refreshes its metadata. So if you create a new CSV file each day for a dataset and store it in a folder with previous days' data it will automatically join the rest of the dataset. Any SQL queries run will see it as appended data.

The Apache Spark Project

Spark has possibly one of the longest feature lists of any piece of software in the Hadoop Ecosystem. There are almost six times as many people that have contributed code to it compared to the Hadoop project itself.

It's great for both analysis and creating derivative datasets. You can develop in Spark using not only SQL and Java but Python, Scala and R as well. The Machine Learning functionality found in Spark is probably one of the best ML platforms freely available today.

Its analytics functionality is very fast and has done well in benchmarks against the likes of Presto.

It also has one of the few query engines I've managed to get to query the entire 1.1 billion row taxi ride dataset using a three-node Raspberry Pi Hadoop cluster.

It works well with unstructured data like HTML, semi-structured data like JSON and structured data formats such as ORC and Parquet. I used Spark to analyse an archive of 3 petabytes of web pages collected by Common Crawl last year.

In 2014 a Spark installation sorted 100 terabytes of data in 23 minutes and in 2016 it sorted 100 terabyte using $144 worth of compute resources.

Querying Data with Spark

Below I will launch a SparkSQL CLI that can query data stored on HDFS and Hive.

$ spark-sql \
    --master spark://ubuntu:7077 \
    --num-executors 1

Note that my hosts file presents ubuntu as the hostname the Spark Master binds to. When I change this parameter to localhost it won't connect so be mindful of what you're /etc/hosts file looks like and check the Spark Master's log for which hostname it has picked to use.

The following is an example SQL query that runs on the taxi dataset.

SELECT cab_type,
FROM trips_orc
GROUP BY cab_type;

To preserve memory on this system I'll shut down Spark's services before moving on to evaluate Presto.

$ sudo /opt/spark/sbin/stop-master.sh
$ sudo /opt/spark/sbin/stop-slaves.sh

Facebook's Presto Query Engine

In 2013 Facebook began work on a new SQL query engine that could analyse the petabytes of data they store faster than any other Hadoop-based engine at the time. Today Presto is used by 1,000+ staff at Facebook to analyse over 300 petabytes of data. The software is also in use at companies like Dropbox and AirBNB and is the backend for Amazon's Athena offering.

Presto isn't so much a database in that it doesn't store any data itself. It's meant to query data stored outside of the computers Presto runs on, compute results and display them back to the user. This act of separating the machines that store data from the machines that look at that data is a key feature in the Hadoop Ecosystem's scaling model.

If you are on a budget and like the sound of decoupling data storage from compute then services like Amazon's EMR support Presto and storing data on S3. You'll save yourself from having to plan out disk capacity schedules and share data between different clusters without a lot of setup.

Once you have an EMR cluster up you can add and remove compute nodes mid-flight and Presto will pick up and drop new workers automatically.

Hive and Spark have a feature overlap with Presto in the SELECT query space but when it comes to aggregate operations only Spark really keeps up with Presto. Where Presto doesn't have a large feature overlap with Spark is in producing complex derivative results. Presto is best for fast analysis. All that being said Presto has support for performing geospatial, JSON, string, date, mathematical and a whole host of other operations.

In terms of integration, Presto doesn't just work with Hive and HDFS. It has connectors to query data from Cassandra, MongoDB, PostgreSQL, Redis, RedShift, SQL Server and a number of other data stores.

When learning about Presto the documentation on the website will get you started but the software has a new release every two weeks or so and the majority of information around new features and fixes lives in the release notes.

Setting up Presto

There are six configuration files I'll will be creating for this Presto installation. These will merge the responsibilities of the coordinator and a single worker onto this single-node installation. Presto's deployment documentation does a good job of explaining each setting's purpose.

The following is a worker configuration file. The data directory will only contain log and not any of the data being queried or result sets.

$ sudo vi /opt/presto/etc/node.properties

Presto is written in Java and as such allows us to set various settings for the JVM.

$ sudo vi /opt/presto/etc/jvm.config

Normally the coordinator and workers live on separate machines but since this is a single-node installation this file I've merged their respective overrides.

$ sudo vi /opt/presto/etc/config.properties

The following controls the verbosity of logging.

$ sudo vi /opt/presto/etc/log.properties

The following tells Presto of the JMX connector.

$ sudo vi /opt/presto/etc/jmx.properties

The following tells Presto of the Hive connector.

$ sudo vi /opt/presto/etc/catalog/hive.properties

I'll launch the coordinator with the following command.

$ sudo /opt/presto/bin/launcher start

TCP port 8080 will expose a web service if there aren't any issues. If anything does go wrong /opt/presto/data will contain log files explaining what happened.

Querying Data with Presto

The following will launch Presto's command line query interface and connect it to Hive.

$ presto \
    --server localhost:8080 \
    --catalog hive \
    --schema default

I can now query the data stored on HDFS.

SELECT cab_type,
FROM trips_orc
GROUP BY cab_type;

When you execute queries Presto's Web UI will populate with telemetry. I think it is one of the best looking UIs in the Hadoop Ecosystem.

$ open http://localhost:8080/
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.