The Gluster Blog

Gluster blog stories provide high-level spotlights on our users all over the world

The tiny microbe’s survival guide for Hadoop deployment.

Gluster
2012-07-27
Scalable code is not enough… you need to know your big data plaform works on the inside.

This post is about how to switch paradigms.  It assumes that you’ve seen one or two of the thousands of big-data sales pitch videos.  Maybe you’ve even skimmed a few parts of the elephant book.  So whats next ?    

Now – its time to start RUNNING your MapReduce jobs in a real cluster…

From “write once, run anywhere” to “write once, run EVERYWHERE”.

Map/reduce jobs run over several machines at once, and these machines need to play nice with each other.  Although you don’t have to be a network engineer to setup a hadoop platform – you do have to be ready to think about data and computation quite differently.  You can’t drag files around, turn off a few firewalls, and pray for the best… Rather: you need to understand Unix, the JVM, threads, and cloud-based file systems like S3.

In particular, you must also understand how these different technologies interplay in order to compose a maintainable, continuously deployable beacon of big-data beauty:

A rough diagram of the way our hadoop deployments work with respect to existing libraries and tools… Its important to note that even the CODE that Mappers and reducers run off of is loaded, by indirection into HDFS, as is the data.  Thus, you have to be comfortable with several different technologies in order to run code, integrate data, and deploy data on HDFS. 

So : Here goes.  The tiny-microbe’s simplified Hadoop deployment survival guide !

Many of these bullets are directly related to EMR – amazon’s dynamic, super-scalable map/reduce computing platform (but they apply equally to any large, distributed computation infrastructure that is based on a hadoop-like paradigm).

1) The easy stuff: CHECK YOUR PLATFORM.

Get a birds eye view of hdfs:

Do you really have “big data?”…. There’s only one way to find out:

  • hadoop fs -dus (total size of all files).
  • hadoop fs -du (get the cumulative file sizes, by directory). 
  • hadoop fs -lsr (recursively list all files).

Elastic Mapreduce : Know your instances!

One of the most critical mistakes I made on EMR was that of ignoring existing AMI-instance information.  This can easily be ascertained via the EMR elastic-mapreduce ruby client:
elastic-mapreduce –region eu-west-1 –describe –jobflow XXXXXXX 
Ignoring most of the boiler plate, we see AmiVersion as well as hadoop version.

    “AmiVersion”: “latest”
    “HadoopVersion”: “0.20.205”

 These are critical : an old “AmiVersion” can be the harbinger of doom: your shell configuration scripts, and even the internal mapper/reducer java classpath settings can be completely thrown off if you have old or noncompatible jars or “obsolete” resources dominating your executable or classpaths.  The simplest way to “broadly” avoid such issues is just to make sure you have a sane AmiVersion and HadoopVersion values by running this simple command. 

I learned this the hard way by using an old version of the amazon ruby elastic-mapreduce client.  Old client scripts can lead to old AMI deployments, even if you specify “latest” all over the place!  So make sure, in addition to checking your cluster after setting it up, that your using the “right” client side tools for deploying and setting up your infrastructure.

2) Custom configuration cannot be avoided.

Hadoop allows you to toggle everything from the amount of memory used in sorting, to default timeouts, the number of total counters, and even the way the classpath is built.  YOU WILL VERY LIKELY HAVE TO MODIFY THESE at some point, because every map/reduce job has its own idiosyncrasies.  Understanding the configuration of hadoop jobs and clusters (these two configuration tasks are distinct… of course), is an essential part of understanding the basics of how hadoop works, and you need to understand how hadoop works in order to be confident when running large, expensive map-reduce jobs.

I’ve been using these links alot lately.  I’ll update this with time:

  • For configuring specific jobs, the best resource is the actual hadoop javadocs : these are the most up-to-date documentation available.  
  • The “official” configuration docs: although there might be some typos, these are especially useful for configuring cluster specific parameters.
  • The essential Cloudera tuning basics.  These will effect both the way you run, as well as write, your map/reduce jobs.
  • Another important “official” article on how the JVM gets reused.  I found this confusing at first, but its really important to understand, especially for large jobs, when latency can be a big problem.
  • Remember: changing your configuration parameters (i.e. those in conf/*xml), might require you to restart your name-node / task trackers! 

3) Classpaths: Something fishy ? Debug them at runtime.

Classpath issues are easy to fix on simple webservers : you can hard code them, you can bundle them…in fact, you can even run a simple webserver right out of your IDE.  Obviously, this is a no-no for distributed, large-scale computation.  Rather than run away from the all mighty java classpath, embrace it as your friend.  Its EASY to debug !

      //a snippet adopted from http://www.mkyong.com/java/

   public static void dumpClasspath() {
ClassLoader cl = ClassLoader.getSystemClassLoader();
for(URL url: urls){ 
          //System.out is a bad idea in the cloud!
          log.info(((URLClassLoader)cl).getURLs());
}
}
 

The above snippet will dump each and every classpath entry to your console, eliminating the guesswork associated with questions like “dammit… which jar is it using for comons-xxx….”.

3) Logging : Yes sir ! You can still barf all over the console ! 

System.outs don’t work in the cloud – the JVMs running your map/reduce jobs are running in a different completely separately from your interface to the NameNode.  BUT, hadoop’s web interface collects all of your logging information per each mapper/reducer for you…So…  stop using System.out, and start using logs, for EVERYTHING that you want to be able to see and debug.  We use the slf4j, library independent logger, which can simply pick a default logger (i.e log4j) off your classpath for you, to eliminate variability. 

Now – you can use your logs to debug anything that goes wrong through the hadoop web ui. The hadoop ui provides a 4KB, 8KB, and FULL view of all logs for EVERY single node, so, when something goes wrong – here’s how you track it down:

  • Go to your task tracker interface.
  • Look for the “failed tasks” data table. 
  • Click on the “failed task”
  • Click on the “Logs” column (in EMR, you do this with the “links” commandline web browser, and you can download any whole file onto disk through the links interface).

4) Unit tests: Real data, real expectations.

Alot of unit tests simply test that your code doesn’t crash miserably, for example:

     Assert.assertTrue(myUrl != null);

Now – imagine you had to crawl 1,000,000 web pages – based on stringified urls.  There are ALOT of bad possible urls that might get sent into any such crawler, for example (“”, “http://<your url goes here>”, “1xxxx”, etc…).  A much better test would be, then, to confirm that any URL generator/parser/extractor is ACTUALLY generating valid, useful urls:

    Assert.assertTrue(new Url(myUrl).getHost().length > 1);

5) For production : Integration tests are more important than Unit tests !

When we decide to test a full pipeline of distributed tasks, we need to know that, when run on a reasonable size data set, we are not

   1) overlogging
   2) creating memory leaks
   3) filling up the heap with unnecessary objects in global variables
   4) maintaining state between computations of orthogonal records

These sorts of bugs are easily seen in a reasonable size integration test, but lost in a small unit tests with mocked data.  Unit tests are, of course, an important developer tool for creating maintainable code – but they are more of a diagnostic tool than anything else.  When running at scale, you MUST first test your data locally with real integration tests that process large amounts of data (that is, that process an amount of data that local machines can reasonably deal with in 5 to 10 minutes).

6) Yo : Did I mention the classpath?

I gotta get back to this : the classpath really is important.  Here are 3 things to remember :

   a) There are 2 classpaths : the classpath that YOUR CODE has.  This is the known as the “HADOOP_CLASSPATH”… And

   b) The runtime classpath: the classpath that YOUR MAPPERS/REDUCERS are run under.  Again, you can use a simple runtime dump of the classpath (you better log it if you want to see it through the hadoop web interface), to debug any issues. When combining a lack of rigor for dealing with your classpath along with a problem such as (1) above (laziness in inspecting and understanding your cluster setup), you expose yourself to a whole host of runtime related ClassNotFound or method-not-found errors which will be due to the fact that your runtime mappers/reducers are inundated in irrelevant, obsolete libraries that precede those which you use for development. 

7) Its not enough to scale your processing … you also need scalable data views !

Calculating data for a large input set in a short period of time is great, but you need to serve it up to add value to your business.  Make damn sure you have a scalable view platform.  The type of “sink” that you put your hadoop data into will require on the use case.  Here is a brief summary of whats I’ve been playing with lately.  Of course, this is but a small sample:

  • SOLR: A search engine, where all fields can be indexed.  You can query by indices, regular expressions, and retrieve data as XML or json.  SOLR is memory intensive, caching results and to provide high performance indexes over the many attributes of for each tuple. 
  • DYNAMODB: A key/value store thats rapidly scalable.  Amazon has reported up to 250K inserts per second.  Lookups are done by keys.  DYNAMODB is run entirely off of solid state disks, in the cloud, by amazon, for you 🙂  So… you really don’t have to worry about the details too much.   Great for lazy programmers that don’t want to worry about administering a large data store 🙂  The downside is that there are limitations on record sizes (64KB), and the indexing isn’t as broad as SOLRs.
  • Columnar databases like Cassandra / HBase: These sorts of databases depart from the relational “table-blocks” approach by creating column blocks.  Columnar databases generally scale better when dealing with large, dynamic data sets. For example, when adding new features to a record (i.e. adding a “social security column” to a “persons” database), individual records (“persons” in this example)  don’t have to be modified to accomodate the newly introduced data type, since each column is a separate disk block. 
  • Relational systems may still work… with the added benefit of making your sysadmin happy 🙂 For a few million reasonably sized records, you may not even need any fancy NoSQL databases at all.  An unsharded MySQL instance might do the trick.  And of course, you have the advantage that “everyone” knows SQL.

8) Deal with dirty data by cleaning and counting … not by ignoring!

Just because your processing unstructured text doesn’t mean you can hack without validation.  Strive for jobs that specialize in handling of clean OR dirty records – but not both.  Why?  Because you should be able to define attributes and relationships, and know that those relationships are preserved logically throughout your pipeline.  

  • If people are defined as having a “shoe size”, make sure that you can count how many do, and do not have records for their “shoe sizes”.  
  • If every individual needs a social security number… throw an exception when you see one that goes missing, and test for that exception exhaustively.
  •  If quality exceptions are hurting you by killing a job : prepend a cleaning job to the beggining of a data set, rather than avoid quality exceptions.

Remember : you will be (most-likely) running the same jobs over and over again, against increasingly larger data sets.  You will need to have a plan for dealing with old data, bad data, recycled data, etc…  And of course : your tests should validate that those relationships are preserved down to the last drop.  A single missing character or case-inversion can lead to millions or billions of lost data points.  

9) Counters .

Effectively evaluating the totality of a job, is difficult when looking at logs, since a typical job might have many 100s of tasks which get logged separately.  In contrast, counters are easily aggregated into a concise view which can be eyeballed in a matter of seconds. 

  • It is easy to overlook that there are both TASK as well as JOB specific counters which are easily accessed through your hadoop job-tracker UI.  TASK specific counters can be accessed, for example, at the localhost’s job tracker “taskdetails.jsp” page, i.e. http://localhost:9100/taskdetails.jsp?tipid=<your_task_id_here>
    • TASK scoped counters will tell you if a specific task is having any issues.  If it is, you can check it out directly by looking at the logs.
    • JOB scoped counters are the counters hadoop prints out at the end of your job – and are the counters shown in the job tracker web ui.  These are, ultimately, the ones which you will rely on the most to evaluate a job’s success/failure.
    • Counters from FAILED jobs are removed from the overall tally by the JobTracker – this is something to beware of when you try to use counters to debug failing or buggy jobs.                                                                   
  • You can also use dynamic counters to increase the spectrum of attributes that you talley without overcomplicating your code. 
    • Note: dynamic counters take up memory and can explode – so don’t use too many of them, and make sure they are bounded !
    • context.getCounter(“my_cntr_group”,”Value_success_”+isValidValue(x)).increment(1);
    • Use counters as “bins” of records created :
      • If you are outputing numbers from 1-1000, you can have counters which emit the log of these scores, so you have an idea of how many are above 10, versus  how many are above 100).  

I might venture to say that counters get to the very essence of computing… in the cloud as well as in conventional programming.  In any case, they are certainly one of the simplest and most efficient debugging tools in the hadoop arsenal.  Our good pal mister avleen might think of this as a real-time, distrubted “grep …… | wc -l”, which continually runs in the background.

10) When all else fails: Know how to find your nodes.

The cloud is like a good baby sitter… it will free you of the mundane aspects of child-care…  But in case of emergencies, your gonna have to be on call.  Sometimes, things just break, somewhat stochastically.  Certain machines can act funny… or certain records can cause a particular reducer to go haywire.  In these cases, its nice to know that you can actually go straight to the scene of the crime.  These commands, taken from https://github.com/Yelp/mrjob/wiki/Accessing-Elastic-MapReduce-slave-nodes-via-SSH, can be absolute lifesavers in a pinch (again, of course, these are quite important for EMR where the machines are ephemeral) .

Get the ip address of your name node :

$> elastic-mapreduce --describe j-JOBFLOWID | grep MasterPublicDnsName | cut -d'"' -f4

List IPs of slave nodes, so you can ssh into them and hack about :

$> hadoop dfsadmin -report | grep ^Name | cut -f2 -d: | cut -f2 -d' '

Although hacking about individual nodes is not going to lead to a production deployable big-data software architecture (i.e. any approach to administering hadoop that requires regular node-to-node manual intervention is going to be very difficult to scale), such hackery is essential for certain types of root-cause-analysis.  For example, I recently found that going around deleting jar files in a particular directory, and rerunning a job, rapidly demonstrated the cause of a error that was killing one of my jobs.  Of course, once you find the problem, its easy enough to add the fix as a new command in your EMR bootstrap script or puppet/chef whatever machine builder.

BLOG

  • 06 Dec 2020
    Looking back at 2020 – with g...

    2020 has not been a year we would have been able to predict. With a worldwide pandemic and lives thrown out of gear, as we head into 2021, we are thankful that our community and project continued to receive new developers, users and make small gains. For that and a...

    Read more
  • 27 Apr 2020
    Update from the team

    It has been a while since we provided an update to the Gluster community. Across the world various nations, states and localities have put together sets of guidelines around shelter-in-place and quarantine. We request our community members to stay safe, to care for their loved ones, to continue to be...

    Read more
  • 03 Feb 2020
    Building a longer term focus for Gl...

    The initial rounds of conversation around the planning of content for release 8 has helped the project identify one key thing – the need to stagger out features and enhancements over multiple releases. Thus, while release 8 is unlikely to be feature heavy as previous releases, it will be the...

    Read more