I recently came across a blog post on how Ben Downling started IPInfo and it reminded me of a blog post I did in 2014 called Collecting all IPv4 WHOIS records in Python.
In the post, I tried to cover the entire IPv4 address space with as few WHOIS calls as possible. I came up with a piece of code that would start 8 threads that would each crawl a separate portion of the IPv4 address space. Each time they got a record back the block range would be examined and the next lookup would be after that range.
I wondered if I could use a MapReduce job on AWS EMR to speed this process up. In this blog post I'll walk through the steps I took to see how well a Hadoop job on a cluster of 40 machines can perform with a network-bound problem.
Can't you just download the data from the Registries?
For the most part AFRINIC, APNIC, ARIN, LACNIC and RIPE NNC will provide downloadable copies of their databases if the intended use of that data meets their acceptable usage policies. If you're wanting to use the data to resolve internet operational issues, perform research and the like then you may be granted access to their datasets.
Unfortunately, this task will involve filling in forms, sending faxes and emails and doing a lot of back and forth before getting files that probably don't conform to the exact same format and could have varying degrees of data quality.
ARIN does publish daily listings of active IPv4 registrations but this data only includes when the last change was made, the country of assignment and the IPv4 address range itself. On top of that, only the ARIN-managed addresses are kept up to date. The address ranges for AFRINIC, APNIC, LACNIC and RIPE NNC were last updated on December 12th, 2013.
The ipwhois python package from Philip Hane allows you to make WHOIS requests against the five registration RDAP interfaces and goes out of it way to normalise the information returned. The metadata returned often includes postal addresses, phone numbers and email addresses of the organisations the addresses have been assigned to. Beyond getting up-to-date assignment details the additional metadata could be very useful for conducting research into IPv4 allocations around the world.
What Allocation Sizes are Most Common?
My plan is to generate a list of IP addresses and use them in a Hadoop job. There will be 40 nodes in the cluster so they'll each be assigned a portion of the whole list. There are ~4 billion IPv4 addresses and all those lookups could take a very long time. I suspect if I can look at a small subsection of the IPv4 space I can use that data and find out how much of the spectrum is unaccounted for. To pick a granularity to use I'll inspect the last known allocation sizes of each of the five registries.
The following was run on a fresh installation of Ubuntu 14.04.3 LTS.
I'll download the latest listings for each of the five registries.
wget ftp://ftp.arin.net/pub/stats/afrinic/delegated-afrinic-20131213 wget ftp://ftp.arin.net/pub/stats/apnic/delegated-apnic-20131213 wget ftp://ftp.arin.net/pub/stats/arin/delegated-arin-extended-20160331 wget ftp://ftp.arin.net/pub/stats/lacnic/delegated-lacnic-20131213 wget ftp://ftp.arin.net/pub/stats/ripencc/delegated-ripencc-20131212
I'll install Python, PostgreSQL and a few other dependencies:
$ echo "deb http://apt.postgresql.org/pub/repos/apt/ trusty-pgdg main 9.5" | \ sudo tee /etc/apt/sources.list.d/postgresql.list $ gpg --keyserver pgp.mit.edu --recv-keys 7FCC7D46ACCC4CF8 $ gpg --armor --export 7FCC7D46ACCC4CF8 | sudo apt-key add - $ sudo apt update $ sudo apt install \ postgresql-9.5 \ postgresql-server-dev-9.5 \ python-dev \ python-pip \ python-virtualenv
I'll create a virtual environment and install three python modules.
$ virtualenv allocations $ source allocations/bin/activate $ pip install \ iptools \ netaddr \ psycopg2
I'll add my Linux account to PostgreSQL's list of super users.
$ sudo su - postgres -c \ "createuser --pwprompt --superuser mark"
I'll create a database in PostgreSQL with a table to store the data from each of the five sources.
$ createdb ips $ psql ips
CREATE TYPE REGISTRY AS ENUM ('arin', 'ripencc', 'apnic', 'lacnic', 'afrinic'); CREATE TYPE STATUS AS ENUM ('assigned', 'allocated', 'reserved'); CREATE TABLE ips ( ip_id SERIAL, registry REGISTRY, country VARCHAR(2), address CIDR, number_ips INTEGER, date_allocated DATE, status STATUS, CONSTRAINT pk_ips PRIMARY KEY (ip_id) );
I'll use a Python script to run an ETL job that will take all the data from the files, pull out the IPv4-specific records and load them into the ips table in PostgreSQL.
from iptools.ipv4 import ip2long, long2ip from netaddr import iprange_to_cidrs import psycopg2 def get_records(filename): with open(filename) as f: for line in f.read().split('\n'): if line and len(line.strip()) and line.strip() == '#': continue parts = [part.strip() for part in line.split('|')] if len(parts) < 7: continue yield parts[:7] # Skip MD5 field in ARIN records def get_cidr(first_ip, num_ips): """ The data sets will tell me the first IP address and how many IPs there are. I'll convert this into CIDR format so it can be stored as a CIDR type in PostgreSQL. """ last_ip = long2ip(ip2long(first_ip) + int(num_ips)) # Cast the list of IPNetwork objects to a list of strings return [str(cidr) for cidr in iprange_to_cidrs(first_ip, last_ip)] files = [ 'delegated-afrinic-20131213.txt', 'delegated-apnic-20131213.txt', 'delegated-arin-extended-20160331.txt', 'delegated-lacnic-20131213.txt', 'delegated-ripencc-20131212.txt', ] pg_dsn = 'postgresql://mark:test@localhost:5432/ips' insert_stmt = """INSERT INTO ips (registry, country, address, number_ips, date_allocated, status) VALUES (%s, %s, %s, %s, %s, %s);""" with psycopg2.connect(pg_dsn) as pg_conn: pg_cur = pg_conn.cursor() for filename in files: for (rir, country, record_type, address, size, date_allocated, status) in \ get_records(filename): if record_type != 'ipv4' or len(date_allocated) != 8: continue for cidr in get_cidr(address, size): record = [rir, country, cidr, int(size), '%s-%s-%s' % (date_allocated[0:4], date_allocated[4:6], date_allocated[6:8]), status] pg_cur.execute(insert_stmt, record)
I can see there are 280,975 records in the ips table:
$ echo 'select count(*) from ips;' | psql ips
count -------- 280975 (1 row)
I'll create some indices that should help speed up analytic queries.
$ psql ips
CREATE INDEX rir_idx ON ips (registry); CREATE INDEX country_idx ON ips (country); CREATE INDEX number_ips_idx ON ips (number_ips); CREATE INDEX date_allocated_idx ON ips (date_allocated); CREATE INDEX status_idx ON ips (status);
Assigned Block Sizes
Excluding those by RIPE NNC, most IPv4 address assignments are rarely very granular. I was unable to find an assignment of fewer than 256 addresses among all other registries. This doesn't mean end-user assignments will be this large but it does show that skipping large blocks of IPv4 space when trying to scan the entire spectrum isn't completely lossy.
SELECT registry, number_ips, COUNT(*) FROM ips WHERE registry != 'ripencc' GROUP BY 1, 2 ORDER BY 1, 2;
registry | number_ips | count ----------+------------+------- arin | 256 | 47324 arin | 512 | 8236 arin | 1024 | 13010 arin | 2048 | 7272 arin | 4096 | 10384 arin | 8192 | 7082 arin | 16384 | 3696 arin | 32768 | 1986 arin | 65536 | 12618 arin | 131072 | 994 arin | 262144 | 664 arin | 524288 | 334 arin | 1048576 | 230 arin | 2097152 | 108 arin | 4194304 | 48 arin | 8388608 | 14 arin | 16777216 | 60 apnic | 256 | 12578 apnic | 512 | 4088 apnic | 1024 | 8448 apnic | 2048 | 3382 apnic | 4096 | 3798 apnic | 8192 | 3464 apnic | 16384 | 1900 apnic | 32768 | 1410 apnic | 65536 | 3306 apnic | 131072 | 1248 apnic | 262144 | 892 apnic | 524288 | 450 apnic | 1048576 | 240 apnic | 2097152 | 98 apnic | 4194304 | 40 apnic | 8388608 | 2 apnic | 16777216 | 4 lacnic | 256 | 2100 lacnic | 512 | 430 lacnic | 1024 | 2090 lacnic | 2048 | 2078 lacnic | 4096 | 4134 lacnic | 8192 | 1494 lacnic | 16384 | 768 lacnic | 32768 | 580 lacnic | 65536 | 1006 lacnic | 131072 | 336 lacnic | 262144 | 354 lacnic | 524288 | 30 lacnic | 1048576 | 24 lacnic | 2097152 | 6 afrinic | 256 | 1388 afrinic | 512 | 149 afrinic | 768 | 21 afrinic | 1024 | 925 afrinic | 1280 | 36 afrinic | 1536 | 24 afrinic | 1792 | 20 afrinic | 2048 | 408 afrinic | 2304 | 15 afrinic | 2560 | 41 afrinic | 2816 | 4 afrinic | 3072 | 6 afrinic | 4096 | 473 afrinic | 5120 | 17 afrinic | 7680 | 9 afrinic | 7936 | 6 afrinic | 8192 | 517 afrinic | 8960 | 4 afrinic | 12800 | 17 afrinic | 16384 | 224 afrinic | 24576 | 3 afrinic | 25600 | 10 afrinic | 32768 | 98 afrinic | 65536 | 354 afrinic | 131072 | 69 afrinic | 196608 | 3 afrinic | 262144 | 48 afrinic | 393216 | 3 afrinic | 524288 | 34 afrinic | 1048576 | 20 afrinic | 2097152 | 8
RIPE NNC on the other hand has very granular assignments with large numbers of cases within each:
registry | number_ips | count ----------+------------+------- ripencc | 8 | 30 ripencc | 16 | 26 ripencc | 32 | 120 ripencc | 48 | 3 ripencc | 64 | 126 ripencc | 96 | 3 ripencc | 128 | 176 ripencc | 192 | 3 ripencc | 256 | 28458 ripencc | 384 | 6 ripencc | 512 | 10449 ripencc | 640 | 4 ripencc | 768 | 498 ripencc | 1024 | 12591 ripencc | 1120 | 4 ripencc | 1152 | 5 ripencc | 1280 | 229 ripencc | 1536 | 263 ripencc | 1792 | 80 ripencc | 2048 | 13419 ripencc | 2304 | 47 ripencc | 2560 | 142 ripencc | 2816 | 41 ripencc | 3072 | 128 ripencc | 3328 | 25 ripencc | 3584 | 20 ripencc | 3840 | 40 ripencc | 4096 | 9447 ripencc | 4352 | 23 ...
Why Not Use One Machine & IP?
It's a valid point that one computer on one IP address possibly could perform this job. To find out how well it would perform I generated a file of 1,000 random IP addresses (1000_ips.txt) and used a pool of 40 workers to perform WHOIS queries.
$ pip install eventlet
from eventlet import * patcher.monkey_patch(all=True) from ipwhois import IPWhois def whois(ip_address): obj = IPWhois(ip_address, timeout=10) results = obj.lookup_rdap(depth=1) print results if __name__ == "__main__": pool = GreenPool(size=40) ip_addresses = open('1000_ips.txt').read().split('\n') for ip_address in ip_addresses: pool.spawn_n(whois, ip_address) pool.waitall()
The task took 11 minutes and 58 seconds to complete on my machine. I occasionally got an HTTPLookupError exception which wasn't the end of the world but then I also saw the following:
HTTPRateLimitError: HTTP lookup failed for http://rdap.lacnic.net/rdap/ip/x.x.x.x. Rate limit exceeded, wait and try again (possibly a temporary block).
If I could use more than one IP address I could avoid these exceptions for longer.
Generating A List of IPs
My plan is to generate ~4-5 million IPv4 addresses that will be used as a first pass. Once I've collected all the WHOIS records I can then see how many black spots are remaining in the IPv4 spectrum. I'll run a Python script to generate this list. I'll do some basic exclusions like skipping multicast, reversed and loopback ranges and skips some well known /8 allocations.
$ pip install ipaddr
import json import ipaddr def is_reserved(ip): return (ipaddr.IPv4Network(ip).is_multicast | ipaddr.IPv4Network(ip).is_private | ipaddr.IPv4Network(ip).is_link_local | ipaddr.IPv4Network(ip).is_loopback | ipaddr.IPv4Network(ip).is_reserved) def get_ips(): """ This will return 4,706,768 addresses. """ for class_a in range(1, 256): if class_a in (3, 9, 10, 12, 15, 16, 17, 18, 19, 20, 34, 48, 56, 127): continue for class_b in range(0, 256): for class_c in range(0, 256, 12): for class_d in range(1, 256, 64): ip = '%d.%d.%d.%d' % (class_a, class_b, class_c, class_d) if not is_reserved(ip): yield ip ips = [ip for ip in get_ips()] with open('4m_ips.txt', 'w') as output_file: output_file.write('\n'.join(ips))
The resulting file is 63 MB uncompressed and contains 4,706,768 IPv4 addresses.
IPv4 WHOIS MapReduce Job
I'll use the MRJob library from Yelp to create my MapReduce job in Python.
$ pip install mrjob
$ mkdir job $ cd job $ vi runner.py
import json from ipwhois import IPWhois from mrjob.job import MRJob from mrjob.step import MRStep class GetWhoisRecords(MRJob): def mapper(self, _, line): try: obj = IPWhois(line, timeout=10) results = obj.lookup_rdap(depth=1) self.increment_counter('whois_operation', 'got_result', 1) yield (json.dumps(results), 1) except Exception as exc: self.increment_counter('whois_operation', 'no_result', 1) yield (None, 1) def steps(self): return [ MRStep(mapper=self.mapper), ] if __name__ == '__main__': GetWhoisRecords.run()
I can then test this script locally with two IP addresses to see that it can run properly.
$ echo -e '22.214.171.124\n126.96.36.199' | \ python runner.py
Launching 40 Nodes on EMR
I'll supply my AWS credentials and make them available via environment variables.
$ read AWS_ACCESS_KEY_ID $ read AWS_SECRET_ACCESS_KEY $ export AWS_ACCESS_KEY_ID $ export AWS_SECRET_ACCESS_KEY
I'll then download a file that will install pip properly on each node in the cluster. This file will be uploaded automatically when the cluster is launched.
$ wget -c https://bootstrap.pypa.io/get-pip.py
I've created a key pair in the AWS console called emr.pem and stored in in the ~/.ssh/ directory on my machine.
I'll then setup MRJob's configuration.
$ vi mrjob.conf
runners: emr: ami_version: 3.6.0 aws_region: eu-west-1 num_ec2_instances: 40 ec2_key_pair_file: ~/.ssh/emr.pem ec2_master_instance_type: c3.xlarge ec2_master_instance_bid_price: '0.05' ec2_instance_type: c3.xlarge ec2_core_instance_bid_price: '0.05' interpreter: python2.7 bootstrap: - sudo yum install python27 python27-devel gcc-c++ - sudo python2.7 get-pip.py# - sudo pip2.7 install boto mrjob ipwhois
The above configuration uses a slightly old but well-tested AMI disk image.
The bootstrap commands will install Python, PIP and three Python libraries. Boto is used by MRJob to store the output of each map operation onto S3. IPWhois is the Python library that will perform the WHOIS operations and return a well-structured dictionary of the results.
Each node will be a c3.xlarge which appears to be one of the smallest and cheapest spot instance you can ask for on EMR. Each has 4 vCPUs, 7.5 GB of memory and 2 40GB SSDs. It's overkill but they're $0.05 / hour each.
The total cost of the 40 spot instances is at most $2.00 / hour. There will also be an additional fee for using the EMR service; I don't know the exact amount but in my past experience it was around 30% of whatever I spent on the EC2 instances.
It's probably not wise to use a spot instance for the master node, if it goes, so does the rest of the job.
Walk Before You Run
To scope out any potential issues with this job I'll break the list of IP addresses up into files of 250K IPs each. That way I can see how long a smaller job will take and see any issues earlier than I would otherwise.
$ sort -R 4m_ips.txt | \ split --lines 250000 - ips_
I'll then run an exploratory job using the first 250K file.
$ python runner.py \ -r emr \ --conf-path mrjob.conf \ --no-output \ --output-dir s3://<s3_bucket>/run1 \ ips_aa
If you run the above command change the S3 bucket to one you have access to.
MapReduce Job Result
The job died after 2 hours, 7 minutes. There were 17 mappers remaining in the end, each at various stages along their assigned ~1,600 IPs. I suspect these machines might have been temporarily blocked and unable to finish their tasks.
There were a large number of mappers that did finish their tasks and I was able to download those results off S3.
$ s3cmd get --recursive s3://<s3_bucket>/run1/
The results were 858 MB uncompressed and represented 235,532 lines of JSON containing WHOIS details. To only miss out on 14,468 lookups isn't bad in my opinion.
Not the Fastest, Nor the Most Fault-Tolerant
I decided to not continue with the remaining ~4.4 million lookups using this approach.
Network-bound problems aren't a great space for Hadoop to work in. It was nice that I didn't need to write much code to get this job up and running but the benchmark I did on my machine with 1,000 IPs shows the cluster was underutilised.
If I were to attempt this task again I'd create a Django app. It would run on each node in a cluster and run each WHOIS query in a celery task. I'd fan out blocks of IP Addresses to each node. If there was an exception I would mark the task to re-try at a later point and give up after a few attempts. I would also use a registry-specific rate-limiting system.