I recently came across a blog post on how Ben Downling started IPInfo and it reminded me of a blog post I did in 2014 called Collecting all IPv4 WHOIS records in Python.
In the post, I tried to cover the entire IPv4 address space with as few WHOIS calls as possible. I came up with a piece of code that would start 8 threads that would each crawl a separate portion of the IPv4 address space. Each time they got a record back the block range would be examined and the next lookup would be after that range.
I wondered if I could use a MapReduce job on AWS EMR to speed this process up. In this blog post I'll walk through the steps I took to see how well a Hadoop job on a cluster of 40 machines can perform with a network-bound problem.
Can't you just download the data from the Registries?
For the most part AFRINIC, APNIC, ARIN, LACNIC and RIPE NNC will provide downloadable copies of their databases if the intended use of that data meets their acceptable usage policies. If you're wanting to use the data to resolve internet operational issues, perform research and the like then you may be granted access to their datasets.
Unfortunately, this task will involve filling in forms, sending faxes and emails and doing a lot of back and forth before getting files that probably don't conform to the exact same format and could have varying degrees of data quality.
ARIN does publish daily listings of active IPv4 registrations but this data only includes when the last change was made, the country of assignment and the IPv4 address range itself. On top of that, only the ARIN-managed addresses are kept up to date. The address ranges for AFRINIC, APNIC, LACNIC and RIPE NNC were last updated on December 12th, 2013.
The ipwhois python package from Philip Hane allows you to make WHOIS requests against the five registration RDAP interfaces and goes out of it way to normalise the information returned. The metadata returned often includes postal addresses, phone numbers and email addresses of the organisations the addresses have been assigned to. Beyond getting up-to-date assignment details the additional metadata could be very useful for conducting research into IPv4 allocations around the world.
What Allocation Sizes are Most Common?
My plan is to generate a list of IP addresses and use them in a Hadoop job. There will be 40 nodes in the cluster so they'll each be assigned a portion of the whole list. There are ~4 billion IPv4 addresses and all those lookups could take a very long time. I suspect if I can look at a small subsection of the IPv4 space I can use that data and find out how much of the spectrum is unaccounted for. To pick a granularity to use I'll inspect the last known allocation sizes of each of the five registries.
The following was run on a fresh installation of Ubuntu 14.04.3 LTS.
I'll download the latest listings for each of the five registries.
wget ftp://ftp.arin.net/pub/stats/afrinic/delegated-afrinic-20131213
wget ftp://ftp.arin.net/pub/stats/apnic/delegated-apnic-20131213
wget ftp://ftp.arin.net/pub/stats/arin/delegated-arin-extended-20160331
wget ftp://ftp.arin.net/pub/stats/lacnic/delegated-lacnic-20131213
wget ftp://ftp.arin.net/pub/stats/ripencc/delegated-ripencc-20131212
I'll install Python, PostgreSQL and a few other dependencies:
$ echo "deb http://apt.postgresql.org/pub/repos/apt/ trusty-pgdg main 9.5" | \
sudo tee /etc/apt/sources.list.d/postgresql.list
$ gpg --keyserver pgp.mit.edu --recv-keys 7FCC7D46ACCC4CF8
$ gpg --armor --export 7FCC7D46ACCC4CF8 | sudo apt-key add -
$ sudo apt update
$ sudo apt install \
postgresql-9.5 \
postgresql-server-dev-9.5 \
python-dev \
python-pip \
python-virtualenv
I'll create a virtual environment and install three python modules.
$ virtualenv allocations
$ source allocations/bin/activate
$ pip install \
iptools \
netaddr \
psycopg2
I'll add my Linux account to PostgreSQL's list of super users.
$ sudo su - postgres -c \
"createuser --pwprompt --superuser mark"
I'll create a database in PostgreSQL with a table to store the data from each of the five sources.
$ createdb ips
$ psql ips
CREATE TYPE REGISTRY AS ENUM ('arin', 'ripencc', 'apnic', 'lacnic', 'afrinic');
CREATE TYPE STATUS AS ENUM ('assigned', 'allocated', 'reserved');
CREATE TABLE ips (
ip_id SERIAL,
registry REGISTRY,
country VARCHAR(2),
address CIDR,
number_ips INTEGER,
date_allocated DATE,
status STATUS,
CONSTRAINT pk_ips PRIMARY KEY (ip_id)
);
I'll use a Python script to run an ETL job that will take all the data from the files, pull out the IPv4-specific records and load them into the ips table in PostgreSQL.
from iptools.ipv4 import ip2long, long2ip
from netaddr import iprange_to_cidrs
import psycopg2
def get_records(filename):
with open(filename) as f:
for line in f.read().split('\n'):
if line and len(line.strip()) and line.strip()[0] == '#':
continue
parts = [part.strip()
for part in line.split('|')]
if len(parts) < 7:
continue
yield parts[:7] # Skip MD5 field in ARIN records
def get_cidr(first_ip, num_ips):
"""
The data sets will tell me the first IP address and how many IPs there
are. I'll convert this into CIDR format so it can be stored as a CIDR
type in PostgreSQL.
"""
last_ip = long2ip(ip2long(first_ip) + int(num_ips))
# Cast the list of IPNetwork objects to a list of strings
return [str(cidr)
for cidr in iprange_to_cidrs(first_ip, last_ip)]
files = [
'delegated-afrinic-20131213.txt',
'delegated-apnic-20131213.txt',
'delegated-arin-extended-20160331.txt',
'delegated-lacnic-20131213.txt',
'delegated-ripencc-20131212.txt',
]
pg_dsn = 'postgresql://mark:test@localhost:5432/ips'
insert_stmt = """INSERT INTO ips (registry,
country,
address,
number_ips,
date_allocated,
status)
VALUES (%s, %s, %s, %s, %s, %s);"""
with psycopg2.connect(pg_dsn) as pg_conn:
pg_cur = pg_conn.cursor()
for filename in files:
for (rir, country, record_type, address, size, date_allocated, status) in \
get_records(filename):
if record_type != 'ipv4' or len(date_allocated) != 8:
continue
for cidr in get_cidr(address, size):
record = [rir,
country,
cidr,
int(size),
'%s-%s-%s' % (date_allocated[0:4],
date_allocated[4:6],
date_allocated[6:8]),
status]
pg_cur.execute(insert_stmt, record)
I can see there are 280,975 records in the ips table:
$ echo 'select count(*) from ips;' | psql ips
count
--------
280975
(1 row)
I'll create some indices that should help speed up analytic queries.
$ psql ips
CREATE INDEX rir_idx ON ips (registry);
CREATE INDEX country_idx ON ips (country);
CREATE INDEX number_ips_idx ON ips (number_ips);
CREATE INDEX date_allocated_idx ON ips (date_allocated);
CREATE INDEX status_idx ON ips (status);
Assigned Block Sizes
Excluding those by RIPE NNC, most IPv4 address assignments are rarely very granular. I was unable to find an assignment of fewer than 256 addresses among all other registries. This doesn't mean end-user assignments will be this large but it does show that skipping large blocks of IPv4 space when trying to scan the entire spectrum isn't completely lossy.
SELECT registry, number_ips, COUNT(*)
FROM ips
WHERE registry != 'ripencc'
GROUP BY 1, 2
ORDER BY 1, 2;
registry | number_ips | count
----------+------------+-------
arin | 256 | 47324
arin | 512 | 8236
arin | 1024 | 13010
arin | 2048 | 7272
arin | 4096 | 10384
arin | 8192 | 7082
arin | 16384 | 3696
arin | 32768 | 1986
arin | 65536 | 12618
arin | 131072 | 994
arin | 262144 | 664
arin | 524288 | 334
arin | 1048576 | 230
arin | 2097152 | 108
arin | 4194304 | 48
arin | 8388608 | 14
arin | 16777216 | 60
apnic | 256 | 12578
apnic | 512 | 4088
apnic | 1024 | 8448
apnic | 2048 | 3382
apnic | 4096 | 3798
apnic | 8192 | 3464
apnic | 16384 | 1900
apnic | 32768 | 1410
apnic | 65536 | 3306
apnic | 131072 | 1248
apnic | 262144 | 892
apnic | 524288 | 450
apnic | 1048576 | 240
apnic | 2097152 | 98
apnic | 4194304 | 40
apnic | 8388608 | 2
apnic | 16777216 | 4
lacnic | 256 | 2100
lacnic | 512 | 430
lacnic | 1024 | 2090
lacnic | 2048 | 2078
lacnic | 4096 | 4134
lacnic | 8192 | 1494
lacnic | 16384 | 768
lacnic | 32768 | 580
lacnic | 65536 | 1006
lacnic | 131072 | 336
lacnic | 262144 | 354
lacnic | 524288 | 30
lacnic | 1048576 | 24
lacnic | 2097152 | 6
afrinic | 256 | 1388
afrinic | 512 | 149
afrinic | 768 | 21
afrinic | 1024 | 925
afrinic | 1280 | 36
afrinic | 1536 | 24
afrinic | 1792 | 20
afrinic | 2048 | 408
afrinic | 2304 | 15
afrinic | 2560 | 41
afrinic | 2816 | 4
afrinic | 3072 | 6
afrinic | 4096 | 473
afrinic | 5120 | 17
afrinic | 7680 | 9
afrinic | 7936 | 6
afrinic | 8192 | 517
afrinic | 8960 | 4
afrinic | 12800 | 17
afrinic | 16384 | 224
afrinic | 24576 | 3
afrinic | 25600 | 10
afrinic | 32768 | 98
afrinic | 65536 | 354
afrinic | 131072 | 69
afrinic | 196608 | 3
afrinic | 262144 | 48
afrinic | 393216 | 3
afrinic | 524288 | 34
afrinic | 1048576 | 20
afrinic | 2097152 | 8
RIPE NNC on the other hand has very granular assignments with large numbers of cases within each:
registry | number_ips | count
----------+------------+-------
ripencc | 8 | 30
ripencc | 16 | 26
ripencc | 32 | 120
ripencc | 48 | 3
ripencc | 64 | 126
ripencc | 96 | 3
ripencc | 128 | 176
ripencc | 192 | 3
ripencc | 256 | 28458
ripencc | 384 | 6
ripencc | 512 | 10449
ripencc | 640 | 4
ripencc | 768 | 498
ripencc | 1024 | 12591
ripencc | 1120 | 4
ripencc | 1152 | 5
ripencc | 1280 | 229
ripencc | 1536 | 263
ripencc | 1792 | 80
ripencc | 2048 | 13419
ripencc | 2304 | 47
ripencc | 2560 | 142
ripencc | 2816 | 41
ripencc | 3072 | 128
ripencc | 3328 | 25
ripencc | 3584 | 20
ripencc | 3840 | 40
ripencc | 4096 | 9447
ripencc | 4352 | 23
...
Why Not Use One Machine & IP?
It's a valid point that one computer on one IP address possibly could perform this job. To find out how well it would perform I generated a file of 1,000 random IP addresses (1000_ips.txt) and used a pool of 40 workers to perform WHOIS queries.
$ pip install eventlet
from eventlet import *
patcher.monkey_patch(all=True)
from ipwhois import IPWhois
def whois(ip_address):
obj = IPWhois(ip_address, timeout=10)
results = obj.lookup_rdap(depth=1)
print results
if __name__ == "__main__":
pool = GreenPool(size=40)
ip_addresses = open('1000_ips.txt').read().split('\n')
for ip_address in ip_addresses:
pool.spawn_n(whois, ip_address)
pool.waitall()
The task took 11 minutes and 58 seconds to complete on my machine. I occasionally got an HTTPLookupError exception which wasn't the end of the world but then I also saw the following:
HTTPRateLimitError: HTTP lookup failed for http://rdap.lacnic.net/rdap/ip/x.x.x.x.
Rate limit exceeded, wait and try again (possibly a temporary block).
If I could use more than one IP address I could avoid these exceptions for longer.
Generating A List of IPs
My plan is to generate ~4-5 million IPv4 addresses that will be used as a first pass. Once I've collected all the WHOIS records I can then see how many black spots are remaining in the IPv4 spectrum. I'll run a Python script to generate this list. I'll do some basic exclusions like skipping multicast, reversed and loopback ranges and skips some well known /8 allocations.
$ pip install ipaddr
import json
import ipaddr
def is_reserved(ip):
return (ipaddr.IPv4Network(ip).is_multicast |
ipaddr.IPv4Network(ip).is_private |
ipaddr.IPv4Network(ip).is_link_local |
ipaddr.IPv4Network(ip).is_loopback |
ipaddr.IPv4Network(ip).is_reserved)
def get_ips():
"""
This will return 4,706,768 addresses.
"""
for class_a in range(1, 256):
if class_a in (3, 9, 10, 12, 15, 16, 17, 18, 19, 20, 34, 48, 56, 127):
continue
for class_b in range(0, 256):
for class_c in range(0, 256, 12):
for class_d in range(1, 256, 64):
ip = '%d.%d.%d.%d' % (class_a, class_b, class_c, class_d)
if not is_reserved(ip):
yield ip
ips = [ip for ip in get_ips()]
with open('4m_ips.txt', 'w') as output_file:
output_file.write('\n'.join(ips))
The resulting file is 63 MB uncompressed and contains 4,706,768 IPv4 addresses.
IPv4 WHOIS MapReduce Job
I'll use the MRJob library from Yelp to create my MapReduce job in Python.
$ pip install mrjob
$ mkdir job
$ cd job
$ vi runner.py
import json
from ipwhois import IPWhois
from mrjob.job import MRJob
from mrjob.step import MRStep
class GetWhoisRecords(MRJob):
def mapper(self, _, line):
try:
obj = IPWhois(line, timeout=10)
results = obj.lookup_rdap(depth=1)
self.increment_counter('whois_operation', 'got_result', 1)
yield (json.dumps(results), 1)
except Exception as exc:
self.increment_counter('whois_operation', 'no_result', 1)
yield (None, 1)
def steps(self):
return [
MRStep(mapper=self.mapper),
]
if __name__ == '__main__':
GetWhoisRecords.run()
I can then test this script locally with two IP addresses to see that it can run properly.
$ echo -e '24.24.24.24\n45.45.45.45' | \
python runner.py
Launching 40 Nodes on EMR
I'll supply my AWS credentials and make them available via environment variables.
$ read AWS_ACCESS_KEY_ID
$ read AWS_SECRET_ACCESS_KEY
$ export AWS_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY
I'll then download a file that will install pip properly on each node in the cluster. This file will be uploaded automatically when the cluster is launched.
$ wget -c https://bootstrap.pypa.io/get-pip.py
I've created a key pair in the AWS console called emr.pem and stored in in the ~/.ssh/ directory on my machine.
I'll then setup MRJob's configuration.
$ vi mrjob.conf
runners:
emr:
ami_version: 3.6.0
aws_region: eu-west-1
num_ec2_instances: 40
ec2_key_pair_file: ~/.ssh/emr.pem
ec2_master_instance_type: c3.xlarge
ec2_master_instance_bid_price: '0.05'
ec2_instance_type: c3.xlarge
ec2_core_instance_bid_price: '0.05'
interpreter: python2.7
bootstrap:
- sudo yum install python27 python27-devel gcc-c++
- sudo python2.7 get-pip.py#
- sudo pip2.7 install boto mrjob ipwhois
The above configuration uses a slightly old but well-tested AMI disk image.
The bootstrap commands will install Python, PIP and three Python libraries. Boto is used by MRJob to store the output of each map operation onto S3. IPWhois is the Python library that will perform the WHOIS operations and return a well-structured dictionary of the results.
Each node will be a c3.xlarge which appears to be one of the smallest and cheapest spot instance you can ask for on EMR. Each has 4 vCPUs, 7.5 GB of memory and 2 40GB SSDs. It's overkill but they're $0.05 / hour each.
The total cost of the 40 spot instances is at most $2.00 / hour. There will also be an additional fee for using the EMR service; I don't know the exact amount but in my past experience it was around 30% of whatever I spent on the EC2 instances.
It's probably not wise to use a spot instance for the master node, if it goes, so does the rest of the job.
Walk Before You Run
To scope out any potential issues with this job I'll break the list of IP addresses up into files of 250K IPs each. That way I can see how long a smaller job will take and see any issues earlier than I would otherwise.
$ sort -R 4m_ips.txt | \
split --lines 250000 - ips_
I'll then run an exploratory job using the first 250K file.
$ python runner.py \
-r emr \
--conf-path mrjob.conf \
--no-output \
--output-dir s3://<s3_bucket>/run1 \
ips_aa
If you run the above command change the S3 bucket to one you have access to.
MapReduce Job Result
The job died after 2 hours, 7 minutes. There were 17 mappers remaining in the end, each at various stages along their assigned ~1,600 IPs. I suspect these machines might have been temporarily blocked and unable to finish their tasks.
There were a large number of mappers that did finish their tasks and I was able to download those results off S3.
$ s3cmd get --recursive s3://<s3_bucket>/run1/
The results were 858 MB uncompressed and represented 235,532 lines of JSON containing WHOIS details. To only miss out on 14,468 lookups isn't bad in my opinion.
Not the Fastest, Nor the Most Fault-Tolerant
I decided to not continue with the remaining ~4.4 million lookups using this approach.
Network-bound problems aren't a great space for Hadoop to work in. It was nice that I didn't need to write much code to get this job up and running but the benchmark I did on my machine with 1,000 IPs shows the cluster was underutilised.
If I were to attempt this task again I'd create a Django app. It would run on each node in a cluster and run each WHOIS query in a celery task. I'd fan out blocks of IP Addresses to each node. If there was an exception I would mark the task to re-try at a later point and give up after a few attempts. I would also use a registry-specific rate-limiting system.