09 April 2011

Dealing with lots of small files in Hadoop MapReduce with CombineFileInputFormat

Input to Hadoop MapReduce process is abstracted by InputFormat. FileInputFormat is a default implementation that deals with files in HDFS. With FileInputFormat, each file is splited into one or more InputSplits typically upper bounded by block size. This means the number of input splits are lower bounded by number of input files. This is not an ideal environment for MapReduce process when it's dealing with large number of small files, because overhead of coordinating distributed processes is far greater than when there are relatively small number of large files. Note here when the input split spills over block boundaries this could work against the general rule of 'having process close to data', because blocks could be at different network locations.

Enter CombineFileInputFormat, it packs many files into each split so that each mapper has more to process. CombineFileInputFormat takes node and rack locality into account when deciding which blocks to place in the same split so it doesn't suffer from the same problem of simply having a big split size.

public class MyCombineFileInputFormat extends CombineFileInputFormat {

  public static class MyKeyValueLineRecordReader implements RecordReader {
    private final KeyValueLineRecordReader delegate;

    public MyKeyValueLineRecordReader(
      CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
      FileSplit fileSplit = new FileSplit(
        split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
      delegate = new KeyValueLineRecordReader(conf, fileSplit);
    }

    @Override
    public boolean next(Text key, Text value) throws IOException {
      return delegate.next(key, value);
    }

    @Override
    public Text createKey() {
      return delegate.createKey();
    }

    @Override
    public Text createValue() {
      return delegate.createValue();
    }

    @Override
    public long getPos() throws IOException {
      return delegate.getPos();
    }

    @Override
    public void close() throws IOException {
      delegate.close();
    }

    @Override
    public float getProgress() throws IOException {
      return delegate.getProgress();
    }
  }

  @Override
  public RecordReader getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {
    return new CombineFileRecordReader(
      job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
  }
}

CombineFileInputFormat is an abstract class that you need to extend and override getRecordReader method. CombineFileRecordReader manages multiple input splits in CombineFileSplit simply by constructing new RecordReader for each input split within. MyKeyValueLineRecordReader creates a KeyValueLineRecordReader to delegate operations to.

Remember to set mapred.max.split.size to a small multiple of block size in bytes as otherwise there will be no split at all.

03 April 2011

Search over key value storage

This is a post about my talk ‘search over key value storage’ for our recent away day at Forward.

It's exciting to see what people implement over simple key value storage. From its inherited limitation of simple key lookups, these key value storage encourages developers to think in terms of data access pattern. This is a good thing. After all data storage strategy is largely limited by your choice of key value storage and governed by your data access pattern. Thinking in terms of data access pattern that your storage needs to support will help you to choose the right implementation.

The term ‘search’ is rather broad term, and I’m rather just interested at realtime lookup based on redundant query indexes.

Let’s look at an example of building query indexes. I’m going to use Redis as my choice of key value storage to build a weblog publishing tool. I could model my blog entries like below.
  entry:9 => {
    title => 'processing event log part1', 
    tags => [nosql, couchdb],
    date_published => 01/11/2010
  }
  entry:10 => {
    title => 'applying agile practices', 
    tags => [randomthought, agilepractice],
    date_published => 20/11/2010
  }
  entry:11 => {
    title => 'search over key value storage',
    tags => [nosql, randomthought],
    date_published => 13/12/2010
  }
To support lookup by tags I would need set of entry ids keyed by its tag. This is an example of typical property match lookup. It's important to realise that all the permutations of tags as keys aren't required to support the lookup, consider set operations. To lookup all the entries tagged by nosql but not randomthought, you just need to do simple set operation of tags:nosql - tags:randomthought
  tags:nosql => [9,11]
  tags:randomthought => [10,11]
  tags:couchdb => [9]
  tags:agilepractice => [10]

To support lookup between date_published range, I would need ordered set of entry ids by custom weight honouring natural order of its date_published. This is an example of typical property range lookup. To look up all the entries between 15/11/2010 and 15/12/2010, work out weight of these two dates first then get a subset of the entries:by_date_published between those two weights. If your key value storage doesn't support ordered set by custom weight, you can implement Trie on top of your key value storage. Have a read of this white paper.
  entries:by_date_published => [9,10,11]

Should we build query indexes ourselves? Although it was fun to consider different strategies for building query indexes, it always seemed rather tedious and expensive to me. How about having search engine on top of key value storage? Good thing is when it comes to indexing big data that made key value storage interesting in the first place, search engines like ElasticSearch and Solr support distributed index space and help manage locality of related keys. Here is an example of ElasticSearch with Rubberband gem.
  @rubberband = ElasticSearch.new('127.0.0.1:9200')
  @rubberband.create_index("yetitrails")
  @rubberband.update_mapping({
    :dynamic => "false",
    :properties => {
      :tags* => {:type => "string", :index => "not_analyzed"},
      :date_published* => {:type => "date", :format => "dd/MM/yyyy"}
    }
  }, :index => "yetitrails", :type => "entry")

  @rubberband.index(
    {:tags => %w[nosql, couchdb], :date_published => "01/11/2010"},
    :index => "yetitrails", :type => "entry")

  @rubberband.index(
    {:tags => %w[randomthought, agilepractice], :date_published => "20/11/2010"},
    :index => "yetitrails", :type => "entry")

  @rubberband.index(
    {:tags => %w[nosql, randomthought], :date_published => "13/12/2010"}, 
    :index => "yetitrails", :type => "entry")
  
  @rubberband.search({
    :query => {:constant_score => {
      :filter => {
        :and => [
          {:term* => {:tags => "nosql"}},
          {:range* => {:date_published => 
            {:from => "15/11/2010", :to => "15/12/2010"}}}
        ]
      }
    }}  
  }, :index => "yetitrails", :type => "entry")

There's ongoing effort of better integration between nosql db and search engine, so that all sounds good, but what if we need to search different things tomorrow? Because that is directly related to volatility of data access pattern and query indexes. Given the nature of big data, Key value storage solution like HBase and Cassandra provide Hadoop integration so that we can reindex in batch. Hadoop contrib/index provides a utility to build or update an Lucene index using Hadoop cluster.

Given all the complication, should we search over key value storage? It depends on nature of your problem domain. However if you do need to search over key value storage, it's not such a bad idea either :)