Scaling counts and uniques using Hadoop
Reading James Hamilton’s post on Google App Engine reminded me to blog about a quick solution to a memory problem I was having in one of our Hadoop jobs. In the analytics world, every metric usually comes with two numbers: counts and uniques. At Lookery we use Hadoop to process all of the data from our logs and Dumbo to quickly write new jobs using Hadoop Streaming and Python. Furthermore, we started using the Hadoop Aggregate package for efficiency as we try to keep as much as we can in the JVM during the combiner/reducer phases. Everything was going great, until I began to get Out of Memory errors while performing a UniqValueCount. The solution was as simple and elegant as Brett Slakin’s suggestion for counters in Google App Engine:For example, Brett shows how to implement a scalable counter and (nearly) ordered comments using App Engine Megastore. For the former, shard the counter to get write scale and sum them on read.
Imagine millions of log lines containing ‘timestamp+TAB+userid’, the goal is to keep a count of how many calls in total and how many unique users we saw during that timeframe. Using Hadoop Aggregate Package all you have to do is emit records like these: “LongValueSum:Key+TAB+Count” or “UniqValueCount:Key+TAB+Value” and Hadoop will use a built-in combiner/reducer to return either “Key+TAB+TotalSum” or “Key+TAB+UniqueCount”. But in the case you run into Hadoop throwing out of memory exceptions when performing unique counts, all you have to do is “Shard It!” Simply put, hash your value and extend the key to include the partition. I use a ‘#’ separator to know when I sharded a key and by performing a second iteration on the job I can then LongValueSum all of the individual partitions to get my final count. Below is a simplified Python+Dumbo script to show the idea. In production, you just let Hadoop take over the reducer/combiner function.
Beware though that I was lazy to actually find the issue with Hadoop, since it shouldn’t need much memory to perform a unique count if the results are sorted, but at least I solved my problem and got to write a blog post about it.
#!/usr/bin/env python
import sys
def mapper1(key, value):
key, value = value.split('t')
yield "LongValueSum:TotalCount", 1
yield "UniqValueCount:%d#Uniques" % (hash(value) % (2 << 14),), value
def mapper2(key, value):
key, value = value.split('t')
if key.find("#") > -1:
h, key = key.split('#')
yield "LongValueSum:%s" % (key,), value
def aggregate_combiner(key, values):
agg, subkey = key.split(":")
if agg.startswith('LongValueSum'):
yield key, sum(values)
elif agg.startswith('UniqValueCount'):
uniq = set()
map(uniq.add, values)
for value in uniq:
yield key, value
def aggregate_reducer(key, values):
agg, subkey = key.split(":")
if agg.startswith('LongValueSum'):
yield key[13:], sum(values)
elif agg.startswith('UniqValueCount'):
uniq = set()
map(uniq.add, values)
yield key[15:], len(uniq)
if __name__ == "__main__":
import dumbo
job = dumbo.Job()
job.additer(mapper1, aggregate_reducer, aggregate_combiner)
job.additer(mapper2, aggregate_reducer, aggregate_combiner)
job.run()
About this entry
You’re currently reading “Scaling counts and uniques using Hadoop,” an entry on Elias Torres
- Published:
- 02.24.09 / 3pm
- Category:
- Lookery
1 Comment
Jump to comment form | comments rss [?] | trackback uri [?]