The Gluster Blog

Gluster blog stories provide high-level spotlights on our users all over the world

Hadoop: Denormalization of many-to-many data by using multiple map keys for a single value.

Gluster
2012-07-23

When I first read about reduce side joins in hadoop, I spent some time walking through a bunch of examples from this whitepaper by Jairam Chandar on Hadoop join-algorithms.

In the beggining, everything seemed simple enough – because I was focusing on joins over 1-1 data relations.  For example, if  we need to join a persons individual “name” to their indivudual “shoesize”, via their “id”, we are only joining a finite number of elements (2) into a single outputted data structure.

On the far left, we have a file with key->value pairs (i.e. typically, these might be tab separated in hadoop).  These are read in by “trivial” mappers, which do nothing other than emit the separate key/values, which are then shuffle/sorted into the same data tuple, which is then the input to a single reduce function.  The join can be implemented in the reducer very easily, by simply adding the first and second “values” to a concatenated string, or a MapWritable, or any other data type.

These types of joins can be done in a very straightforward way in hadoop, often involving a little hacking with character separators, or pair objects.

But what about Graphs, Relations, and Networks ? 

When we talk about “big-data”, we ultimately are going to have to face the fact that some data sets are combinatorial expansive.  Social networks are a great example : The amount of interconnections increase geometrically with respect to the number of nodes.   When joining data in this paradigm, the simpler strategies for map/reduce joins do not work.

So lets learn about how we can use many emissions of keys from SINGLE mapper, in order to create an arbitrarily large number of nested view, where each node is a key which can ultimately have many related nodes in its value tuples. 

The 2nd job in a two job flow which produces highly-denormalized, accumulated data: On the far left, we depict that each user is related to many other users, for example both “steve” and “laly” are related to “jayunit100”, and the “Megajoiner job #1″ captures this information for us, emitting objects which have “lists” of related nodes.  Thus the input to Megajoiner job #2 is akin to a pojo (containing a user’s data), combined with an adjacency list (which has all the users friends).  Now, if we emit steve’s data, n times (where n is the number of friends steve has), with each key being the friend’s id (shown above is 1234, the id of jayunit100), we will now have steve’s data mapped into jayunit100’s reducer.  The power of this is that, since we are doing this for ALL users, we have each node’s reducer accessing all related node’s data !  This gives us a massively denormalized “summary” of all related nodes data for any given node.  For example, on the far right, we can see that our reducer receives a large list of information, which can then be used to create a “snapshot” of all of the shoesizes and names of individuals who are known to be related to the jayunit100 user.

Megajoiner job #1: Join all user information to user interactions: thus creating an intermediate hadoop output file which contains objects which have (1) self-describing data (i.e. shoesize, id, name) and (2) A adjacency list of id’s for all related objects (i.e. id’s of friends).

Our first job will scan the entire network of individuals, emitting data about (1) a given individual and (2) ids of all his related nodes (i.e. his friends).  So, it will emit the key 1234, with jayunit100’s information, and additionally, it can store, inside of jayunit100’s information, a list of ids for friends of jayunit100 (i.e. 4567, 8999).  It will also emit 1234->4567,8999.  These relations will “guide” the next job’s mapper – which can emit the same personal information over and over again, each time with a different key. That is , the next job will be able to scan through these relations (4567, 8999), and emit each one as a key which points to the data of jayunit100. 

{id:1234 name:”jayunit100″ shoesize:8.5 {relations:[4567, 8999]}

{id:8999 name:”steve” shoesize:6 {relations:[4567, 1234]}

Megajoiner job #2: Invert the keys: Use the embedded adjacency list ids as keys, and emit all of them pointing to the primary data for the node in which they are found.  That is, emit a related node as the KEY of the individual record’s value.  This leads to the same data being sent to many different reducers, like this:

1234->
 {id:1234 name:”jayunit100″ shoesize:8.5},
 {id:8999 name:”steve” shoesize:6 {relations:[4567, 1234]},
 …

This is somewhat counterintuitive, since most of us “learned” to create hash data structures by defining keys which uniquely map to values.The key thing to realize here, however, is that keys are just a “routing” mechanism in hadoop, which allow us to optimize and scale calculations by sending data to a large number of distributed nodes.

So, again, in this case, we are actually emitting MANY keys for a SINGLE value. This way, hadoop’s “shuffle and sort” will send many different values to a given reducer, so that we can emit a large summary with information from many different indices, allowing for extremely denormalized data outputs, which can be independently utilized without having to do extra table scans or lookups.

This sort of thing is extremely important in cases where, for example, we want to recover summary data for a webservice in a matter of milliseconds.

TADAAAA ! 

In Job2 we can now join an ARBITRARY number of records.  This is akin to the cross product which we all learned about in discrete mathematics and set theory some time ago.  By emitting the same data, over and over again, simply varying keys, we increase the number of reducers which will have access to a given record.  We can create super-high performance, denormalized database indices in a massively parallel Map-reduce idiom !  This strategy requires only as much memory as any individual final result requires: there is no need to use an intermediate data structure that holds large cache’s of personal data, nor is there a need to use a large external query system to iteratively join data over and over again.  In other words : This approach scales (even in extreme scenarios where a given node has many related nodes… because the reducer inputs do not need to be stored in memory).

A quick aside : You’ll notice that I used a richer data abstraction for the join values in these examples…  Most of the time, especially in tutorials, we see simpler TextWritable or other, more primitive writable types.  You might be wondering how the concrete implementation of such a join would work: and there are many answers.  Richer data types abound in real-world MapReduce jobs- we can use pojos that are serialized (avro/thrift/protocolbuffers), or MapWritables to deal with complex value types.  Alternatively, we can use a more semantically rich abstraction for data processing, such as that provided by the pangool project, from the datasalt folks which gives you an abstract framework for dealing with the implementation specific details of dealing with hadoop records which have many values.  Pangool allows you to use relational and predicate logic in defining such map/reduce transformations.   

In any case… No matter what hammer you choose to drive your MapReduce jobs, the core point here is that an understanding of the way we can cleverly use keys to distribute workloads to reducers is part of the beauty of the entire MapReduce paradigm.  In this case, we do so by overloading keys for a given value. In other cases, we might do the opposite: we may desire to overload multiple values for a given key.

BLOG

  • 06 Dec 2020
    Looking back at 2020 – with g...

    2020 has not been a year we would have been able to predict. With a worldwide pandemic and lives thrown out of gear, as we head into 2021, we are thankful that our community and project continued to receive new developers, users and make small gains. For that and a...

    Read more
  • 27 Apr 2020
    Update from the team

    It has been a while since we provided an update to the Gluster community. Across the world various nations, states and localities have put together sets of guidelines around shelter-in-place and quarantine. We request our community members to stay safe, to care for their loved ones, to continue to be...

    Read more
  • 03 Feb 2020
    Building a longer term focus for Gl...

    The initial rounds of conversation around the planning of content for release 8 has helped the project identify one key thing – the need to stagger out features and enhancements over multiple releases. Thus, while release 8 is unlikely to be feature heavy as previous releases, it will be the...

    Read more