Apathy of the Commons

Eight years ago, I filed a bug on an open-source project.

HADOOP-3733 appeared to be a minor problem with special characters in URLs. I hadn’t bothered to examine the source code, but I assumed it would be an easy fix. Who knows, maybe it would even give some eager young programmer the opportunity to make their first contribution to open-source.

I moved on; I wasn’t using Hadoop day-to-day anymore. About once a year, though, I got a reminder email from JIRA when someone else stumbled across the bug and chimed in. Three patches were submitted, with a brief discussion around each, but the bug remained unresolved. A clumsy workaround was suggested.

Linus’s Law decrees that Given enough eyeballs, all bugs are shallow. But there’s a correlary: Given enough hands, all bugs are trivial. Which is not the same as easy.

The bug I reported clearly affected other people: It accumulated nine votes, making it the fourth-most-voted-on Hadoop ticket. And it seems like something easy to fix: just a simple character-escaping problem, a missed edge case. A beginning Java programmer should be able to fix it, right?

Perhaps that’s why no one wanted to fix it. HADOOP-3733 is not going to give anyone the opportunity to flex their algorithmic muscles or show off to their peers. It’s exactly the kind of tedious, persistent bug that programmers hate. It’s boring. And hey, there’s an easy workaround. Somebody else will fix it, right?

Eventually it was fixed. The final patch touched 12 files and added 724 lines: clearly non-trivial work requiring knowledge of Hadoop internals, a “deep” bug rather than a shallow one.

One day later, someone reported a second bug for the same issue with a different special character.

If there’s a lesson to draw from this, it’s that programming is not just hard, it’s often slow, tedious, and boring. It’s work. When programmers express a desire to contribute to open-source software, we think of grand designs, flashy new tools, and cheering crowds at conferences.

A reward system based on ego satisfaction and reputation optimizes for interesting, novel work. Everyone wants to be the master architect of the groundbreaking new framework in the hip new language. No one wants to dig through dozens of Java files for a years-old parsing bug.

But sometimes that’s the work that needs to be done.

* * *

Edit 2016-07-19: The author of the final patch, Steve Loughran, wrote up his analysis of the problem and its solution: Gardening the Commons. He deserves a lot of credit for being willing to take the (considerable) time needed to dig into the details of such an old bug and then work out a solution that addresses the root cause.

Big & Small at the same time

I haven’t posted in a while — look for more later this summer.

But in the mean time, I have a question: How do you structure data such that you can efficiently manipulate it on both a large scale and a small scale at the same time?

By large scale, I mean applying a transformation or analysis efficiently over every record in a multi-gigabyte collection.  Hadoop is very good at this, but it achieves its efficiency by working with collections in large chunks, typically 64 MB and up.

What you can’t do with Hadoop — at least, not efficiently — is retrieve a single record.  Systems layered on top of Hadoop, like HBase, attempt to mitigate the problem, but they are still slower than, say, a relational database.

In fact, considering the history of storage and data access technologies, most of them have been geared toward efficient random access to individual records — RAM, filesystems, hard disks, RDBMS’s, etc.  But as Hadoop demonstrates, random-access systems tend to be inefficient for handling very large data collections in the aggregate.

This is not merely theoreticaly musing — it’s a problem I’m trying to solve with AltLaw.  I can use Hadoop to process millions of small records.  The results come out in large Hadoop SequenceFiles.  But then I want to provide random-access to those records via the web site.  So I have to somehow “expand” the contents of those SequenceFiles into individual records and store those records in some format that provides efficient random access.

Right now, I use two very blunt instruments — Lucene indexes and plain old files.  In the final stage of my processing chain, metadata and searchable text get written to a Lucene index, and the pre-rendered HTML content of each page gets written to a file on an XFS filesystem.  This works, but it ends up being one of the slower parts of the process.  Building multiple Lucene indexes and merging them into one big (~6 GB) index takes an hour; writing all the files to the XFS filesystem takes about 20 minutes.  There is no interesting data manipulation going on here, I’m just moving data around.

Antidenormalizationism

When storing any large collection of data, one of the most critical decisions one has to make is when to normalize and when to denormalize.  Normalized data is good for flexibility — you can write queries to recombine things in any combination.  Denormalized data is more efficient when you know, in advance, what the queries will be.  There’s a maxim, “normalize until it hurts, denormalize until it works.”

In the past two years, I’ve become a big fan of three software paradigms:

  1. RDF and the Semantic Web
  2. RESTful architectures
  3. Hadoop and MapReduce

The first two were made for each other — RDF is the ideal data format for the Semantic Web, and Semantic Web applications are (ideally) implemented with RESTful architectures.

Unfortunately, as I discovered through painful experience, RDF is an inefficient representation for large amounts of data.  It’s the ultimate in normalization — each triple represents a single fact — and in flexibility, but even simple queries become prohibitively slow when each item of interest is spread across dozens of triples.

At the other end of the spectrum, MapReduce programming forces you into an entirely different way of thinking.  MapReduce offers exactly one data access mechanism: read a list of records from beginning to end.  No random access, do not pass GO, do not collect $200.  That restriction is what enables Hadoop to distribute a job across many machines, and to process data at the maximum rate supported by the hard disk.  Obviously, to take full advantage of these optimizations, your MapReduce program needs to be able to process each record in isolation, without referring to any other resources.  In effect, everything has to be completely denormalized.

For my work on AltLaw, I’ve tended to use fully denormalized data, because anything else is too slow.  But I want to be working with normalized data.  I want to have every datum — plus information about its derivation — stored in one giant RDF graph.  But I also want to be able to process this graph efficiently.  Maybe if I had a dozen machines with 64 GB of memory apiece, this wouldn’t be a problem.  But with one desktop, one server, and occasional rides on EC2, that’s not an option.

The ideal design, I think, would be a hybrid system which can do efficient bulk processing of RDF data.  There are some pilot projects to do this with Hadoop, and I’m interested to see how they pan out.  But until then, I’ll have to make do as best I can.

Update: The projects I remebered were HRDF and Heart, which turn out to be the same thing: http://code.google.com/p/hrdf/ and http://wiki.apache.org/incubator/HeartProposal

The Document-Blob Model

Update September 22, 2008: I have abandoned this model.  I’m still using Hadoop, but with a much simpler data model.  I’ll post about it at some point.

Gosh darn, it’s hard to get this right.  In my most recent work on AltLaw, I’ve been building an infrastructure for doing all my back-end data processing using Hadoop and Thrift.

I’ve described this before, but here’s a summary of my situation:

  1. I have a few million files, ~15 GB total.
  2. Many files represent the same logical entity, sometimes in different formats or from different sources.
  3. Every file needs 5-10 steps of clean-up, data extraction, and format conversion.
  4. The files come from ~15 different sources, each requiring different processing.
  5. I get 20-50 new files daily.

I want to be able to process all this efficiently, but I also want to be able to change my mind.  I can never say, “I’ve run this process on this batch of files, so I never need to do it again.”  I might improve the code, or I might find that I need to go back to the original files to get some other kind of data.

Hadoop was the obvious choice for efficiency.  Thrift is a good, compact data format that’s easier to use than Hadoop’s native data structures.  The only question is, what’s the schema?  Or, more simply, what do I want to store?

I’ve come up with what, for want of a better term, I call the Document-Blob Model.

I start with a collection of Documents.  Each Document represents a single, logical entity, like a court case or a section of statute.  A Document contains an integer identifier and an array of Blobs, nothing more.

What is a Blob?  Good question.  It’s data, any data.  It may represent a normal file, in which case it stores the content of that file and some metadata like the MIME type.  It may also represent a data structure.  Because unused fields do not occupy any space in Thrift’s binary format, the Blob type can have fields for every structure I might want to use now or in the future.  In effect, a Blob is a polymorphic type that can become any other type.

So how do I know which type it is?  By where it came from.  Each Blob is tagged with the name of its “provider”.  For files downloaded in bulk, the provider is the web site or service where I got them.  For generated files, the creator is a class or script, with a version number.

So I have a few hundred thousand Documents, each containing several Blobs.  I represent each conversion/extraction/processing step as its own Java class.  All those classes implement the same, simple interface: take one Blob as input and return another Blob as output.

Helper functions allow me to say things like, “Take this Document, find the Blob that was generated by class X.  Run  class Y on that Blob, and append result to the original Document.”  In this way, I can stack multiple processing steps into a single Hadoop job, but retain the option of reusing or rearranging those steps later on.

Will this work?  I have no idea.  I just came up with it last week.  Today I successfully ran a 5-step job on ~700,000 documents from the public.resource.org federal case corpus.  It took about an hour on a 10-node Hadoop/EC2 cluster.

The real test will come when I apply this model to the much messier collection of files we downloaded directly from the federal courts.

EC2 Authorizations for Hadoop

I just did my first test-run of a Hadoop cluster on Amazon EC2. It’s not as tricky as it appears, although I ran into some snags, which I’ll document here. I also found these pages helpful: EC2 on Hadoop Wiki and manAmplified.

First, make sure the EC2 API tools are installed and on your path. Also make sure the EC2 environment variables are set. I added the following to my ~/.bashrc:

export EC2_HOME=$HOME/ec2-api-tools-1.3-19403
export EC2_PRIVATE_KEY=$HOME/.ec2/MY_PRIVATE_KEY_FILE
export EC2_CERT=$HOME/.ec2/MY_CERT_FILE
export PATH=$PATH:$EC2_HOME/bin

I also copied my generated SSH key to ~/.ec2/id_rsa-MY_KEY_NAME.

You need authorizations for the EC2 security group that Hadoop uses. The scripts in hadoop-*/src/contrib/ec2 are supposed to do this for you, but they didn’t for me. I had to do:

ec2-add-group hadoop-cluster-group -d "Group for Hadoop clusters."
ec2-authorize hadoop-cluster-group -p 22
ec2-authorize hadoop-cluster-group -o hadoop-cluster-group -u YOUR_AWS_ACCOUNT_ID
ec2-authorize hadoop-cluster-group -p 50030
ec2-authorize hadoop-cluster-group -p 50060

The first line creates the security group. The second line lets you SSH into it. The third line lets the individual nodes in the cluster communicate with one another. The fourth and fifth lines are optional; they let you monitor your MapReduce jobs through Hadoop’s web interface. (If you have a fixed IP address, you can be slightly more secure by adding -s YOUR_ADDRESS to the commands above.)

These authorizations are permanently tied to your AWS account, not to any particular group of instances, so you only need to do this once. You can see your current EC2 authorization settings with ec2-describe-group, it should look something like this:

GROUP   YOUR_AWS_ID    hadoop-cluster-group    Group for Hadoop clusters.
PERMISSION      YOUR_AWS_ID    hadoop-cluster-group    ALLOWS  all                     FROM    USER    YOUR_AWS_ID    GRPNAME hadoop-cluster-group
PERMISSION      YOUR_AWS_ID    hadoop-cluster-group    ALLOWS  tcp     22      22      FROM    CIDR    0.0.0.0/0

With additional lines for ports 50030 and 50060, if you enabled those.

A Million Little Files

My PC-oriented brain says it’s easier to work with a million small files than one gigantic file. Hadoop says the opposite — big files are stored contiguously on disk, so they can be read/written efficiently. UNIX tar files work on the same principle, but Hadoop can’t read them directly because they don’t contain enough information for efficient splitting. So, I wrote a program to convert tar files into Hadoop sequence files.

Here’s some code (Apache license), including all the Apache jars needed to make it work:

tar-to-seq.tar.gz (6.1 MB)

Unpack it and run:

java -jar tar-to-seq.jar tar-file sequence-file

The output sequence file is BLOCK-compressed, about 1.4 times the size of a bzip2-compressed tar file. Each key is the name of a file (a Hadoop “Text”), the value is the binary contents of the file (a BytesWritable).

It took about an hour and a half to convert a 615MB tar.bz2 file to an 868MB sequence file. That’s slow, but it only has to be done once.

Disk is the New Tape

An interesting scenario from Doug Cutting: Say you have a terabyte of data, on a disk with 10ms seek time and 100MB/s max throughput. You want to update 1% of the records. If you do it with random-access seeks, it takes 35 days to finish. On the other hand, if you scan the entire database sequentially and write it back out again, it takes 5.6 hours.

This is why Hadoop only supports linear access to the filesystem. It’s also why Hadoop coder Tom White says disks have become tapes.  All this is contrary to the way I think about data — building hash tables and indexes to do fast random-access lookups.  I’m still trying to get my head around this concept of “linear” data processing.  But I have found that I can do some things faster by reading sequentially through a batch of files rather than trying to stuff everything in a database (RDF or SQL) and doing big join queries.

Continuous Integration for Data

As I told a friend recently, I’m pretty happy with the front-end code of AltLaw.  It’s just a simple Ruby on Rails app that uses Solr for search and storage.  The code is small and easy to maintain.

What I’m not happy with is the back-end code, the data extraction, formatting, and indexing.  It’s a hodge-podge of Ruby, Perl, Clojure, C, shell scripts, SQL, XML, RDF, and text files that could make the most dedicated Unix hacker blanch.  It works, but just barely, and I panic every time I think about implementing a new feature.

This is a software engineering problem rather than a pure computer science problem, and I never pretended to be a software engineer.  (I never pretended to be a computer scientist, either.)  It might also be a problem for a larger team than an army of one (plus a few volunteers).

But given that I can get more processing power (via Amazon) more easily than I can get more programmers, how can I make use of the resources I have to enhance my own productivity?

I’m studying Hadoop and Cascading in the hopes that they will help.  But those systems are inherently batch-oriented.  I’d like to move away from a batch processing model if I can.  Given that AltLaw acquires 50 to 100 new cases per day, adding to a growing database of over half a million, what I would really like to have is a kind of “continuous integration” process for data.  I want a server that runs continuously, accepting new data and new code and automatically re-running processes as needed to keep up with dependencies.  Perhaps given a year of free time I could invent this myself, but I’m too busy debugging my shell scripts.