Home | Benchmarks | Archives | Atom Feed

Posted on Mon 25 June 2018

Customising Airflow: Beyond Boilerplate Settings

Apache Airflow is a data pipeline orchestration tool. It helps run periodic jobs that are written in Python, monitor their progress and outcome, retry failed jobs and convey events in a colourful and concise Web UI. Jobs, known as DAGs, have one or more tasks. Tasks can be any sort of action such as downloading a file, converting an Excel file to CSV or launching a Spark job on a Hadoop cluster. Airflow will isolate the logs created during each task and presents them when the status box for the respective task is clicked on. This process helps investigate failures much quicker than having to search endlessly through lengthy, aggregated log files.

Over the past 18 months nearly every platform engineering job specification I've come across has mentioned the need for Airflow expertise. Airflow's README file lists over 170 firms that have deployed Airflow in their enterprise. Even Google has taken notice, they've recently announced a managed Airflow service on their Google Cloud Platform called Composer. This offering uses Kubernetes to execute jobs in their own, isolated Compute Engine instances and resulting logs are stored on their Cloud Storage service.

Orchestration tools are nothing new. When Airflow began its life in 2014 there were already a number of other tools which provided functionality that overlaps with Airflow's offerings. But Airflow was unique in that it's a reasonably small Python application with a great-looking Web UI. Installation of Airflow is concise and mirrors to the process of most Python-based web applications. Jobs are written in Python and are easy for newcomers to put together. I believe these are the key reasons for Airflow's popularity and dominance in the orchestration space.

Airflow has a healthy developer community behind it. As of this writing, 486 engineers from around the world have contributed code to the project. These engineers include staff from data-centric firms such as AirBNB, Bloomberg, Japan's NTT (the World's 4th largest telecoms firm) and Lyft. The project has a strong enough developer community around it that the original author, Maxime Beauchemin, has been able to step back from making major code contributions over the past two years without a significant impact on development velocity.

I've already covered performing a basic Airflow installation as a part of a Hadoop cluster and put together a walk-through on creating a Foreign Exchange Rate Airflow job. In this blog post I'm going to walk through six features that should prove helpful in customising your Airflow installations.

Installing Prerequisites

I recommend Airflow being installed on a system that has at least 8 GB of RAM and 100 GB of disk capacity. I try to ensure jobs don't leave files on the drive Airflow runs but if that does happen, it's good to have a 100 GB buffer to spot these sorts of issues before the drive fills up.

The following will install Python, PostgreSQL, git and the screen utility. This was run on a fresh installation of Ubuntu 16.04.2 LTS.

$ sudo apt install \
    git \
    postgresql \
    python \
    python-pip \
    python-virtualenv \
    screen

I'll create a Python virtual environment, activate it and install a few Python-based packages that will be used throughout this blog post.

$ virtualenv ~/.air_venv
$ source ~/.air_venv/bin/activate

$ pip install \
    'apache-airflow==1.9.0' \
    boto3 \
    cryptography \
    dask \
    'distributed>=1.17.1, <2' \
    flask_bcrypt \
    psycopg2-binary

Using PostgreSQL for Metadata

In other Airflow posts I've written I've used MySQL and SQLite to store Airflow's Metadata but over the past year or so when I've deployed Airflow into production I've been using PostgreSQL. I've found PostgreSQL good for concurrency, storing time zone information in timestamps and having great defaults in its command line tools. Airflow will use SQLite by default but I recommend switching to PostgreSQL for the reasons mentioned above.

Below I'll create a PostgreSQL user account that matches my UNIX username and as well as a new PostgreSQL database for Airflow.

$ sudo -u postgres \
    bash -c "psql -c \"CREATE USER mark
                       WITH PASSWORD 'airflow_password'
                       SUPERUSER;\""
$ createdb airflow

I'll initialise Airflow and edit its configuration file changing the Metastore's connection details to the PostgreSQL installation above.

$ cd ~
$ airflow initdb
$ vi ~/airflow/airflow.cfg
[core]
sql_alchemy_conn = postgresql+psycopg2://mark:airflow_password@localhost/airflow

I'll run initdb again to setup the tables in PostgreSQL and I'll upgrade those tables with the latest schema migrations.

$ airflow initdb
$ airflow upgradedb

Storing Logs on AWS S3

By default, Airflow stores log files locally without compression. If you are running a lot of jobs or even a small number of jobs frequently, disk space can get eaten up pretty fast. Storing logs on S3 not only alleviates you from disk maintenance considerations, the durability of these files will be better guaranteed with AWS S3's 11 9s of durability versus what most disk manufactures and file systems can offer.

The following will create a connection to an S3 account. Fill in the access and secret keys with an account of your own. I recommend the credentials only having read/write access to a dedicated S3 bucket.

$ airflow connections \
    --add \
    --conn_id AWSS3LogStorage \
    --conn_type s3 \
    --conn_extra '{"aws_access_key_id": "...", "aws_secret_access_key": "..."}'

The following will create a logging configuration. Note there is an S3_LOG_FOLDER setting at the top of the file. This should be replaced with the location where you want to store your log files on S3. If this isn't replaced with a valid location Airflow will produce a lot of error messages.

$ mkdir -p ~/airflow/config
$ touch ~/airflow/config/__init__.py
$ vi ~/airflow/config/log_config.py
import os

from airflow import configuration as conf


S3_LOG_FOLDER = 's3://<your s3 bucket>/airflow-logs/'

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

The following will modify Airflow's settings so that it uses the logging configuration above. The three configuration settings are under the [core] section but there will be boilerplate settings for each item spread throughout the section. Take your time editing each line of configuration individually.

$ vi ~/airflow/airflow.cfg
[core]
task_log_reader = s3.task
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = AWSS3LogStorage

Setup User Accounts

Airflow supports login and password authentication. Airflow doesn't ship with a command line tool for creating accounts but if you launch Python's REPL you'll be able to programmatically create user accounts.

The following configuration changes are needed to enable password authentication. Make sure these changes are under the [webserver] section and not the [api] section (which also has an auth_backend directive).

$ vi ~/airflow/airflow.cfg
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

Then launch Python's REPL.

$ python

The code below creates a user account. The password will be stored using a cryptographic hash.

import airflow
from airflow import (models,
                     settings)
from airflow.contrib.auth.backends.password_auth import PasswordUser


user = PasswordUser(models.User())
user.username = 'username'
user.email    = 'user@example.com'
user.password = 'password'

session = settings.Session()
session.add(user)
session.commit()
session.close()

Run Jobs with Dask.distributed

Dask is a parallel computing library that includes a lightweight task scheduler called Dask.distributed. In the past I've used Celery and RabbitMQ to run Airflow jobs but recently I've been experimenting with using Dask instead. Dask is trivial to setup and, compared to Celery, has less overhead and much lower latency. In February 2017, Jeremiah Lowin contributed a DaskExecutor to the Airflow project. Below I'll walk through setting it up.

First, I'll change the executor setting to DaskExecutor.

$ vi ~/airflow/airflow.cfg
[core]
executor = DaskExecutor

Before I can launch any services the ~/airflow/config folder needs to be in Python's path in order to get picked up by the rest of Airflow.

$ export PYTHONPATH=$HOME/airflow/config:$PYTHONPATH

In production I run Airflow's services via Supervisor but if you want to test out the above in a development environment I recommend using screens to keep Airflow's services running in the background.

Below I'll launch the Dask.distributed scheduler in a screen.

$ screen
$ dask-scheduler

Type CTRL-a and then CTRL-d to detach from the screen leaving it running in the background.

I'll then launch a Dask.distributed worker in another screen.

$ screen
$ dask-worker 127.0.0.1:8786

Note, if you setup the Python virtual environment on other machines you can launch workers on them creating a cluster for Airflow to run its jobs on.

The following will launch Airflow's Web UI service on TCP port 8080.

$ screen
$ airflow webserver -p 8080

The following will launch Airflow's scheduler.

$ screen
$ airflow scheduler

If you open Airflow's Web UI you can "unpause" the "example_bash_operator" job and manually trigger the job by clicking the play button in the controls section on the right. Log files read via the Web UI should state they're being read off of S3. If you don't see this message it could be the logs haven't yet finished being uploaded.

*** Reading remote log from s3://<your s3 bucket>/airflow-logs/example_bash_operator/run_this_last/2018-06-21T14:52:34.689800/1.log.

Airflow Maintenance DAGs

Robert Sanders of Clairvoyant has a repository containing three Airflow jobs to help keep Airflow operating smoothly. The db-cleanup job will clear out old entries in six of Airflow's database tables. The log-cleanup job will remove log files stored in ~/airflow/logs that are older than 30 days (note this will not affect logs stored on S3) and finally, kill-halted-tasks kills lingering processes running in the background after you've killed off a running job in Airflow's Web UI.

Below I'll create a folder for Airflow's jobs and clone the repository.

$ mkdir -p ~/airflow/dags
$ git clone https://github.com/teamclairvoyant/airflow-maintenance-dags.git \
    ~/airflow/dags/maintenance

If Airflow's scheduler is running, you'll need to restart it in order to pick up these new jobs. Below I'll "unpause" the three jobs so they'll start running on a scheduled basis.

$ for DAG in airflow-db-cleanup \
             airflow-kill-halted-tasks \
             airflow-log-cleanup; do
      airflow unpause $DAG
  done

Create Your Own Airflow Pages

Airflow supports plugins in the form of new operators, hooks, executors, macros, Web UI pages (called views) and menu links. Below I'll walk through creating a simple page that displays the contents of a list of dictionaries in a Airflow UI-themed table.

Below I'll put together the logic for this plugin.

$ mkdir -p ~/airflow/plugins/templates/test_plugin
$ touch ~/airflow/plugins/__init__.py
$ vi ~/airflow/plugins/test_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
from flask_admin import BaseView, expose


class TestView(BaseView):
    @expose('/')
    def test(self):
        data = [{'column_a': 'Content',
                 'column_b': '123',
                 'column_c': 'Test'}]

        return self.render("test_plugin/test.html", data=data)


admin_view_ = TestView(category="Test Plugin", name="Test View")

blue_print_ = Blueprint("test_plugin",
                        __name__,
                        template_folder='templates',
                        static_folder='static',
                        static_url_path='/static/test_plugin')


class AirflowTestPlugin(AirflowPlugin):
    name             = "test_plugin"
    admin_views      = [admin_view_]
    flask_blueprints = [blue_print_]

Below is the Jinja-formatted template which will render the HTML, CSS and JS code needed to display the plugin's page.

$ vi ~/airflow/plugins/templates/test_plugin/test.html
{% extends 'admin/master.html' %}
{% import 'admin/lib.html' as lib with context %}
{% import 'admin/static.html' as admin_static with context%}
{% import 'admin/model/layout.html' as model_layout with context %}

{% block head %}
    {{ super() }}
    {{ lib.form_css() }}
{% endblock %}

{% block body %}
    <h2>Example Heading</h2>
    {% block model_menu_bar %}
    {% endblock %}

    {% block model_list_table %}
    <table class="table table-striped table-bordered table-hover model-list">
        <thead>
            <tr>
                <th class="column-header">Column A</th>
                <th class="column-header">Column B</th>
                <th class="column-header">Column C</th>
            </tr>
        </thead>

        {% for row in data %}
            <tr>
                <td>{{ row.column_a }}</td>
                <td>{{ row.column_b }}</td>
                <td>{{ row.column_c }}</td>
            </tr>
        {% endfor %}
    </table>
    {% endblock %}

{% endblock %}

{% block tail %}
    {{ super() }}
    <script src="{{ admin_static.url(filename='admin/js/filters.js') }}"></script>
    {{ lib.form_js() }}
{% endblock %}

You'll need to restart Airflow's webserver in order for the plugin to be picked up. You should see a menu item in Airflow's top navigation called "Test Plugin". There will be a sub-navigation link called "Test View" that will launch the plugin.

To get an idea of how a more featureful plugin could be put together, have a look at the Hive Metastore Browser plugin that's kept in Airflow's contrib folder.

Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

Copyright © 2014 - 2017 Mark Litwintschik. This site's template is based off a template by Giulio Fidente.