Insert large data into hive dynamic partitions

Dynamic partitioned hive table can help to store raw data into partitioned form which may be helpful in further querying.

Select query is generally faster when executed on a partitioned table. Its always adviced to create an external table on raw file in HDFS and then insert that data into partitioned table.

But when we try to insert really large file into a dynamically partitioned table it many a times fails due to many files being opened at mapper stage.

Following steps can solve the issue of massive data insert into hive dynamically partitioned table :

Lets say table is like :

CREATE  TABLE IF NOT EXISTS stocks_main(
exchange STRING,
symbol STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
PARTITIONed BY (ymd STRING)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’  LOCATION ‘/user/impadmin/NYSE_PARTITIONED’;

and Stocks is an external table created over raw data in HDFS with same schema.

Now we need to pull data from Stocks table into partitioned Stocks_Main table.

We need to set following properties to enable inserts :

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.parallel=false;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;

Execute following query to insert into table :

insert into  table stocks_main partition(ymd) select exchange,symbol,price_open,price_high,price_low,price_close,volume,price_adj_close,ymd from stocks;

This results in only mapper programs and in case of many files being opened, the insert operation fails.

This can be sorted out by moving the file writing operation at reducer stage by executing the above query with “distribute by” clause :

insert into  table stocks_main partition(ymd) select exchange,symbol,price_open,price_high,price_low,price_close,volume,price_adj_close,ymd from stocks distribute by ymd;

This will convert only mappers to map reduce and fix problem of huge file writing to hive table.

 

Monitoring Spring Data jobs with Spring Batch Admin UI

We can launch spring Data jobs using Spring Batch. This has been explained in my previous post. The Spring Integration+Spring Batch+Spring Data ETL can be monitored using Spring Batch Admin. This is same as oozie web app displaying job status.

Changes in previous post example to attach Spring Batch Admin :

    • The job repository in last example was using in memory persistence. To enable Admin UI we need shared DB. Let us configure app with mysql db. Changes in application-context.xml.

<beans:bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <beans:property name="dataSource" ref="dataSource"/>
    <beans:property name="transactionManager" ref="transactionManager" />
	</beans:bean>
	

	<beans:bean name="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<beans:property name="driverClassName" value="com.mysql.jdbc.Driver"/>
		<beans:property name="url" value="jdbc:mysql://localhost:3306/spring1"/>
		<beans:property name="username" value="root"/>
		<beans:property name="password" value="impetus"/>
	</beans:bean>

  • Add the mysql client driver jar in pom.xml.
       <dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.24</version>
	</dependency>

  • Expand the war file in tomcat webapps. You will find batch-hsql.properties and batch-mysql.properties in WEB-INF/classes folder. Change them as per our configuration.
  • Start the tomcat and hit url to load this webapp. This will generate all the relevant tables in mysql. The same tables will be used by our app when some job is being executed.

Spring Integration + Batch + Hive as ETL

End to end pipeline to analyze data on hadoop is a very common problem.

People have been predominantly solving this problem either applying ETL to get the data into hadoop ecosystem. Some people solve the problem using technology stack of Apache Flume, Oozie coordinator ,Hive/Pig.[https://github.com/cloudera/cdh-twitter-example] We were designing similar solution in which weblogs were frequently collected into data staging directory. Few hive queries needed to be executed on those logs to show reports using Tableau. Using too many different technologies and binding them was somewhat not alluring.  We solved the same problem using Spring integration + Spring Batch + Spring Data[HiveRunner]. In this stack all the layers are Spring based and hence can be easily integrated and above that easy to maintain, test and modify.

Solution Steps :

    • Copy data to hdfs using spring integration file inbound adapter ,hdfs outbound adapter and polling mechanism.The same can be achieved by adding following to application context

<int:channel id="filesChannel"/>
<file:inbound-channel-adapter id="localFileAdapter"
				 channel="filesChannel"
	                          directory="<local path>"
	                          filename-pattern="<file pattern>"> ---*.log
	<int:poller id="poller" fixed-delay="5000"  />
</file:inbound-channel-adapter>

<int:outbound-channel-adapter id="hdfsAdapter"
				 channel="filesChannel"  ref="hiveBatchFlow"/>
 
    • Configure the spring batch job which invokes hiveTasklet.
         <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
	<beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
	<beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository"/>

	<beans:bean class="org.springframework.batch.core.scope.StepScope">
		<beans:property name="proxyTargetClass" value="true"/>
	</beans:bean>

	<batch:job id="job1">
		<batch:step id="hive">
			<batch:tasklet ref="hiveTasklet" />
		</batch:step>
	</batch:job>
         
    • Configure the bean which would be invoked when new file is kept into the local file system directory. This class would hold reference to batch job and jobLauncher. This class should be extending AbstractReplyProducingMessageHandler.
<beans:bean id="hiveBatchFlow"
	      class="com.example.HiveBatchFlowHandler">
		<beans:constructor-arg value="<hdfs location>"/>
		<beans:constructor-arg ref="hadoopConfiguration"/>
		<beans:constructor-arg ref="jobLauncher"/>
		<beans:constructor-arg ref="job1"/>
	</beans:bean>
    • This class should override protected Object handleRequestMessage(Message<?> requestMessage).
      In this method we can invoke the spring batch job.
           jobLauncher.run(job,new JobParametersBuilder().toJobParameters());
         
  • So far we are able to add a poller to local directory which copies any file with extension txt. When such file is found the HiveFlowHandler is invoked and control passees to handleRequestMessage. Here we can fine grain the hdfs directory path and also after copying the file to HDFS using spring hadoop FileSystem class we will launch the spring batch job which invokes a hive step. This means will execute the hive script. To complete the picture we need to configure the hive Client and hive tasklet.This can be done by adding following to applicationContext
           <hdp:configuration id="hadoopConfiguration">
		fs.default.name=${hd.fs}
	</hdp:configuration>
<hdp:hive-client-factory host="${hive.host}" port="${hive.port}"/>

	<!-- the tasklet is same as shared on spring Data Hive page-->

	<hdp:hive-tasklet id="hiveTasklet">
		<hdp:script location="apache-log-simple.hql">
			<arguments>
				hiveContribJar=${app.repo}/hive-contrib-0.8.1.jar
				localInPath="${app.home}/data/apache.log"
			</arguments>
		</hdp:script>
	</hdp:hive-tasklet>
         
  • We used maven build very similar to examples provided on Spring Data project.

I will try to put the sourcode on github as soon as possible.

Executable jar file with dependent jars using Maven

Sometimes we have a simple project which is launched using main method. If we build a jar using maven plugin then dependencies are not shipped with the executable jar. One can jar all the dependent jars using maven assembly plugin with “jar-with-dependencies” descriptor.

Example :

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Only issue with this approach is that it will expand all the dependent jars and include the class files and other dependent files as part of jar. It becomes difficult to identify your code with 3rd party class files. Sometimes few files will overshadow each other in case we have file with same name and path.

The ideal solution is to include the jars in a lib folder and the manifest.mf file of the main jar include all the jars in classpath.

This can be achieved by http://code.google.com/p/onejar-maven-plugin/. Issue with onejar plugin is that it will put its code in your project and its class will load your main class. Some sort of indirection with which we were not comfortable.

Finally we zeroed on “maven-assembly-plugin” which actually creates a zip file with similar structure as jar file. We had to just rename the zip file to jar file.

Here are the steps which we followed :

1. Add following 2 snippets to the pom.xml

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>myProj.Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>

—————————————————-

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>final-jar</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/main/assembly/finalJar.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>

2. Add all the dependencies and jar name in pom.xml as we always do.

3. Create finalJar.xml to src/main/assembly folder of your project.

4. Append the following to finalJar.xml

<assembly xmlns=”http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0&#8243; >
<id>distribution</id>
<formats>
<format>zip</format>
</formats>
<!–to include the main jar in non transitive manner and without base directory and in expanded way –>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>true</unpack>
<scope>runtime</scope>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
<includes>
<include>project_groupid:project_artifactid</include>
</includes>
</dependencySet>

<!–to include the dependent jars in transitive manner –>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>true</useTransitiveDependencies>
<unpack>false</unpack>
<scope>runtime</scope>
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
<outputDirectory>lib</outputDirectory>

<includes></includes>
</dependencySet>
</dependencySets>
</assembly>

 

 
 

 

Thread safe access to HBase Tables using HTablePool and Spring HBase

Accessing HBase table by HTable is not threadsafe.

In order to access HTable instance in multithreaded mode HTablePool or HBaseTemplate should be used. The latter one is DAO pattern support by Spring. HTablePool is suggested by apache hbase client library.

Lets first discuss the HTablePool way.

HTablePool is not just a simple pool of HTable objects but handle thread-local objects as well. It allows the HTablePool to be initialized in resuable and threadLocal mode both. By supporting ThreadLocal mode it takes away pain of ThreadLocal objects usage in app code.

In order to initialize the HTablePool with threadLocal pool use this constructor :
HTablePool Constructor. When PoolType is set to ThreadLocal it actually binds the resource to the thread from which it is invoked.

This way has been suggested as a defacto to access HTable but here we have to write boiler plate code to access the pool i.e., get the HTable and close the resources and handling checked exception. In short we miss the support of spring DAO.

The same thing can be achieved using Spring Data Hadoop – HBase module.Spring data module provides HBaseTemplate class which is threadsafe in nature. It encapsulates all the boiler plate and provides famous spring exception conversion. The example usage can be found at  https://github.com/SpringSource/spring-data-book/tree/master/hadoop/hbase. This link has one issue related in application-context.xml. We need to set the zookeeper properties in HBaseConfiguration object which is missing in the example.

Only issue with this approach is that it keeps on creating and destroying HTable objects with every method call. This actually is negating usage of pools. In order to avoid recreating HTable objects one should use the Spring HbaseSynchronizationManager. It binds the HTable to the calling thread thus introducing the concept of Threadlocal objects. Each subsequent call made through HbaseTemplate is aware of the table bound and will use it instead of retrieving a new instance.  It can be set manually or through interceptors(AOP) using HbaseInterceptor. The manual setting example can be found at TestTemplate. Using interceptor may affect performance.

Spring way provides the same benefits as we get using spring DAO for RDBMS.

Matrix Multiplication on Hadoop MapReduce

Matrix multiplication is a problem which inherently doesn’t fit to mapReduce programming model as it can’t be divided and conquered.

Matrix multiplication is an important step in many m/c learning algorithms. Mahout library provides an implementation of matrix multiplication over hadoop. The problem with that implementation is that it starts only single mapper task as it uses CompositeInputFormat.

In order to calculate document similarity we had to perform matrix multiplication of order [6000,300] and [300,25000]. When this was done over mahout it took lot of time.

Thus we implemented over own logic for the same.

Here’s the steps to perform matrix multiplication :

Input :

1.  Path 1, of sequential file where key is of type IntWritable and and value is of type VectorWritable[please check mahout library for reference] representing first matrix.

2.  Path 2, of sequential file where key is of type IntWritable and and value is of type VectorWritable[please check mahout library for reference] representing second matrix.

Logic :

If we transpose the second matrix then it is essentially a cartesian product between two files. For example consider M1 = [{1,2,},{3,4},{,5,6}] and M2=[{A,B,C},{D,E,F}] then M1M2 = [{1A+2D,1B+2E,1C+2F},{3A+4D,3B+4E,3C+4F},{5A+6D,5B+6E,5C+6F}]

now M2′ = [{A,D},{B,E},{C,F}]

One can perform Cartesian Product between M1and M2′ to achieve at the same result.

Steps :

1. Perform transpose of second file. Reference implementation can be found at  :

http://grepcode.com/file/repo1.maven.org/maven2/org.apache.mahout/mahout-core/0.4/org/apache/mahout/math/hadoop/TransposeJob.java

2. Use CartesianInputFormat and CartesianRecordReader to calculate the input splits in order to parallelize cartesian product. The reference can be found at https://github.com/adamjshook/mapreducepatterns/tree/master/MRDP/src/main/java/mrdp/ch5 [From : MapReduce Design Patterns]

It actually picks the inputSplits from two input files and create a list mapping each left side input split with right side. So if first file has 3 splits and second has 4 then we will have 3*4=12 splits. Thus we will have 12 mappers.

3. Write a mapper which takes the two vectors and multiply each list index item and add them up. Emit the key as left side file’s key and value as Pair of right side key and actual cell value.

4. Write a reducer which now converts the pair object into VectorWritable object.

Job Configuration will be like :

job.setMapperClass(CartesianMultiplicationMapper.class);

job.setInputFormat(CartesianInputFormat.class);

CartesianInputFormat.setLeftInputInfo(job, SequenceFileInputFormat.class,
                 “path1”);
        CartesianInputFormat.setRightInputInfo(job, SequenceFileInputFormat.class,
                 “path2”);

        SequenceFileOutputFormat.setOutputPath(job, new Path(“cartOutput));

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(VectorWritable.class);

Will provide actual implementation on github.

Unit Testing Hadoop Map Reduce Jobs

In this post we would discuss various strategies to test and validate the map reduce jobs for hadoop.

Being a parallel programming framework it becomes a bit difficult to properly unit test and validate map reduce jobs from a developer’s scope let alone the Test Driven Development.

We will focus on various ways to do unit testing for map reduce jobs.

In the post we will discuss how to validate map reduce output using :
1. JUnit framework to test mappers and reducers using mocking (Mockito)
2. MRUnit framework to completely test the flow but in a single JVM.
3. mini-HDFS and a mini-MapReduce cluster to perform Integration Testing.
4. Hadoop Inbuilt Counters
5. LocalJobRunner to debug jobs using local filesystem.

1. JUnit framework to test mappers and reducers using mocking (Mockito)

Junit tests can be easily executed for Map Reduce jobs provided we test map function and reduce function in isolation. We can also test Driver function but with SpringData -Hadoop[http://www.springsource.org/spring-data/hadoop] project, driver configuration can be moved out of code. Using springData beans can further ease testing. If we execute the map and reduce function in isolation then there is dependency only on context object. We can easily clone context object using Mockito[http://code.google.com/p/mockito/].

All the tests can be executed from IDE. We just need to hadoop distribution jars and Mockito and junit jars in classpath.

Example to test the WordCount Mapper. It works with hadoop 1.0.3 and junit 4.1. We have mocked the context class.

public class WordCountTest {

	private TokenizerMapper mapper;
	private Context context;
	final Map<Object,Object> test = new HashMap();
	final AtomicInteger counter = new AtomicInteger(0);

	@Before
	public void setUp() throws Exception {
		mapper = new TokenizerMapper();
		context = mock(Context.class);
	}

	@Test
	public void testMethod() throws IOException, InterruptedException {

		doAnswer(new Answer<Object>() {
			public Object answer(InvocationOnMock invocation) {
				Object[] args = invocation.getArguments();
				test.put(args[0].toString(), args[1].toString());
				counter.incrementAndGet();
				return "called with arguments: " + args;
			}
		}).when(context).write(any(Text.class),any(IntWritable.class));

		mapper.map(new LongWritable(1L), new Text("counter counter counter" +
		" test test test"), context);
		Map<String,String> actualMap = new HashMap<String, String>();
		actualMap.put("counter", "1");
		actualMap.put("test", "1");
		assertEquals(6,counter.get());
		assertEquals(actualMap, test);
	}
}

On the similiar lines reducer can be tested.

Key to use this strategy effectively is to refactor the code properly.Business logic related code should be moved out of map and reduce methods. It helps in effectively testing the business logic. We should also think about moving the mapper and reducer is separate classes. This follows strategy pattern and better reusability.

Junit tests with Mockito are very easy to use. Only problem is that we can not test the solution as a whole. It at max certifies the business logic. We should consider other testing strategy to test the complete solution.

Hadoop the Definitive Guide can be used as a reference[http://shop.oreilly.com/product/9780596521981.do].

2. MRUnit framework to completely test the flow but in a single JVM.

MRUnit is testing framework which provides support structure to test map reduce jobs. It provides mocking support which can be helpful in testing Mapper, Reducer, Mapper+Reducer and Driver as well.http://mrunit.apache.org/ is a top level apache project now. It takes JUnit mocking a level up for map reduce job testing.

Example
We require mrunit and mockito jars and hadoop supporting jars. Test has been executed on hadoop 0.20.203 and junit4. We are testing the PiEstimator example provided with hadoop distribution.

public class TestExample {

	MapDriver<LongWritable, LongWritable, BooleanWritable,
	LongWritable> mapDriver;
	ReduceDriver<BooleanWritable, LongWritable, WritableComparable<?>,
	Writable> reduceDriver;
	MapReduceDriver<LongWritable, LongWritable, BooleanWritable,
	LongWritable, WritableComparable<?>, Writable> mapReduceDriver;

	@Before
	public void setUp() {
		PiEstimator.PiMapper mapper = new PiEstimator.PiMapper();
		PiEstimator.PiReducer reducer = new PiEstimator.PiReducer();
		mapDriver = new MapDriver<LongWritable, LongWritable,
		BooleanWritable, LongWritable>();
		mapDriver.setMapper(mapper);
		reduceDriver = new ReduceDriver<BooleanWritable, LongWritable,
		WritableComparable<?>, Writable>();
		reduceDriver.setReducer(reducer);
		mapReduceDriver = new MapReduceDriver<LongWritable, LongWritable,
		BooleanWritable,LongWritable,
		WritableComparable<?>, Writable>();
		mapReduceDriver.setMapper(mapper);
		mapReduceDriver.setReducer(reducer);
	}

	@Test
	public void testMapper() {
		mapDriver.withInput(new LongWritable(10), new LongWritable(10));
		mapDriver.withOutput(new BooleanWritable(true), new LongWritable(10));
		mapDriver.addOutput(new BooleanWritable(false), new LongWritable(0));
		mapDriver.runTest();
	}

	@Test
	public void testReducer() {
		List<LongWritable> values = new ArrayList<LongWritable>();
		values.add(new LongWritable(10));
		reduceDriver.withInput(new BooleanWritable(true), values);

		reduceDriver.runTest();
	}
}

These tests are extemely fast as we dont require any interaction with filesystem. This are very good but lacks support to test code in distributed environment. Please check
http://mrunit.apache.org/documentation/javadocs/0.9.0-incubating/org/apache/hadoop/mrunit/mock/package-summary.html
for other useful support classes. These tests can be sufficient to test code in isolation but doesn’t test interaction with HDFS and test execution on cluster.

3. mini-HDFS and a mini-MapReduce cluster to perform Integration Testing

There can be certain issues which might be caught in integration test only. Anything used as a object variable may be caught while executing job on cluster. Hadoop has support to launch a dummy cluster to create a testing environment. Supporting classes for dummy cluster are MiniDFSCluster, MiniMRCluster and ClusterMapReduceTestCase. Hadoop
internally uses these classes for testing.It launches two DataNodes and a NameNode, and a mini-MapReduce cluster with two TaskTrackers and a JobTracker.

Test set up :
Classpath should have hadoop-core.jar(I executed test on 0.20.203),hadoop- default.xml,hadoop-test.jar and all jetty related jar which can be found in lib folder.

Set following system property

System.setProperty(“hadoop.log.dir”, “test_dir”);
This is the directory where dummy cluster writes and reads files and logs. It should be created or during setup recreate the directory.

If you get some parsing error please set
System.setProperty(“javax.xml.parsers.SAXParserFactory”,
“com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl”);

We would be testing the most common example i.e. wordCount. We are using junit4 and hadoop 0.20.203.

This example creates a filesystem and MRcluster. We are also checking the counters in this example.

public class WordCountTest {

	private TokenizerMapper mapper;
	private Context context;
	final Map<Object,Object> test = new HashMap();
	final AtomicInteger counter = new AtomicInteger(0);
	private MiniDFSCluster dfsCluster = null;
	private MiniMRCluster mrCluster = null;

	private final Path input = new Path("input");
	private final Path output = new Path("output");

	@Before
	public void setUp() throws Exception {
		new File("NCHAPLOT_LOG").mkdirs();
		System.setProperty("hadoop.log.dir", "NCHAPLOT_LOG");
		final String rootLogLevel =
		System.getProperty("virtual.cluster.logLevel","WARN");
		final String testLogLevel = System.getProperty("test.log.level", "INFO");
		System.setProperty("javax.xml.parsers.SAXParserFactory",
		"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
		// LOG.info("Setting Log Level to " + rootLogLevel);
		LogManager.getRootLogger().setLevel(Level.toLevel(rootLogLevel));
		Configuration conf = new Configuration();
		dfsCluster = new MiniDFSCluster(conf, 1, true, null);
		dfsCluster.getFileSystem().makeQualified(input);
		dfsCluster.getFileSystem().makeQualified(output);

		assertNotNull("Cluster has a file system", dfsCluster.getFileSystem());
		mrCluster = new MiniMRCluster(1,
		dfsCluster.getFileSystem().getUri().toString(), 1);
		mapper = new TokenizerMapper();
		context = mock(Context.class);

	}

	protected FileSystem getFileSystem() throws IOException {
		return dfsCluster.getFileSystem();
	}

	private void createInput() throws IOException {
		Writer wr = new OutputStreamWriter(getFileSystem().create(new Path(input, "wordcount")));
		wr.write("neeraj chaplot neeraj\n");
		wr.close();
	}

	@Test
	public void testJob() throws IOException,
	InterruptedException, ClassNotFoundException {
		Configuration conf = mrCluster.createJobConf();

		createInput();

		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setNumReduceTasks(1);
		FileInputFormat.addInputPath(job,input);
		FileOutputFormat.setOutputPath(job, output);

		job.waitForCompletion(true);

		final String COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
		Counters ctrs = job.getCounters();
		System.out.println("Counters: " + ctrs);
		long combineIn = ctrs.findCounter(COUNTER_GROUP,
		"COMBINE_INPUT_RECORDS").getValue();
		long combineOut = ctrs.findCounter(COUNTER_GROUP,
		"COMBINE_OUTPUT_RECORDS").getValue();
		long reduceIn = ctrs.findCounter(COUNTER_GROUP,
		"REDUCE_INPUT_RECORDS").getValue();
		long mapOut = ctrs.findCounter(COUNTER_GROUP,
		"MAP_OUTPUT_RECORDS").getValue();
		long reduceOut = ctrs.findCounter(COUNTER_GROUP,
		"REDUCE_OUTPUT_RECORDS").getValue();
		long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
		"REDUCE_INPUT_GROUPS").getValue();

		assertEquals("map out = combine in", mapOut, combineIn);
		assertEquals("combine out = reduce in", combineOut, reduceIn);
		assertTrue("combine in > combine out", combineIn > combineOut);
		assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);

		InputStream is = getFileSystem().open(new Path(output,
		"part-r-00000"));
		BufferedReader reader = new BufferedReader(new
		InputStreamReader(is));

		assertEquals("chaplot\t1", reader.readLine());
		assertEquals("neeraj\t2", reader.readLine());
		assertNull(reader.readLine());
		reader.close();
	}

	@After
	public void tearDown() throws Exception {
		if (dfsCluster != null) {
			dfsCluster.shutdown();
		}
		if (mrCluster != null) {
			mrCluster.shutdown();
		}
	}
}

These tests are useful if we want to test code on cluster from IDE without launching a separate cluster. Keep in mind this wont help us in debugging. These tests are time consuming. The code in After and before if possible should be moved to beforeClass and AfterClass method. This is most concrete way to validate our job.

Only issue we observed here was related to time taken to execute a test. More information can be found in Pro Hadoop book[http://www.amazon.com/dp/B008PHZ3A2] and examples provided with Hadoop the definitive guide. Hadoop also ships test written using same support classes. Please check  TestMapReduceLocal.java. There are many other utility classes provided with Hadoop code which helps in testing like., MapReduceTestUtil.

4. Hadoop Inbuilt Counters

Counters help in quantitative analysis of the job. It provides aggregated statistics at the end and hence can be referred to validate the output. Hadoop provides some built in as well as user defined counters. We can analysis them using apis in
Driver class or all counters are listed in output logs at last.
The best thing about counters are that they work at the cluster level i.e., provides aggregated information about all the mappers and reducers.

Built-in Counters
Hadoop provides some built in counter to provide information about each process of hadoop for a particular job.

Few important ones for debugging and testing perspective:
MAP_INPUT_RECORDS — number of input records consumed by all the maps.
MAP_OUTPUT_RECORDS — number of output records produced by all the maps.
REDUCE_INPUT_RECORDS — number of input reocords consumed by all the reducers.
REDUCE_OUTPUT_RECORDS — number of output records produced by all the reducers.

User-Defined Java Counters
We can have our own counters to report the state of job. These provide output in form of a map. There are two ways to create and access the counters viz., enums and Strings. Enum is more easy and is type safe. It should be used in case we know all the output states in advance. Enum based counters are best suited for case where we want to
calculate the number of requests based on HttpResponseCode. String based counters are dynamic and can be used where we don’t have visibility in advance. This can be used when we want to do count based on domain.

Example

Consider the simple wordCount Example. Lets try to find out are we processing all the rows or not.

We will pick the “MAP_INPUT_RECORDS” to know how many rows were presented as input. We are using two enum counters to count the number of null and not null rows.

public class WordCount {

	public static class TokenizerMapper
	extends Mapper<Object, Text, Text, IntWritable>{

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context
		) throws IOException, InterruptedException {
			//incrementing the counters
			if(value == null || value.toString().equals("")) {
				context.getCounter(State.NULL).increment(1);
			}else {
				context.getCounter(State.NOT_NULL).increment(1);
			}
			StringTokenizer itr = new StringTokenizer(value.toString());

			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer
	extends Reducer<Text,IntWritable,Text,IntWritable> {
	private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable values,
		Context context
		) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	//defining the enum
	enum State {
		NULL,
		NOT_NULL
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		Job job = new Job(conf, "word count");
		//for brevity purpose full job config not shown
		job.waitForCompletion(true);
		//reading all the counters
		long inputCount =
		job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter",
		"MAP_INPUT_RECORDS").getValue();
		System.out.println("Total Input Rows ::::"+inputCount);
		System.out.println("Not Null Rows ==="    +job.getCounters().findCounter
		(State.NOT_NULL).getValue());
		System.out.println(" Null Rows ==="    +job.getCounters().findCounter
		(State.NULL).getValue());
		System.exit( 0);
	}

}

The sum of null and not null rows can be counted to make sure that all rows were being processed. This is a very simplest of example. Good thing is that we have to process only few counter information to analysis job state. Example runs on Hadoop 1.0.3.

Counters are best suited for scenario where we want to validate output at aggregated level ., like whether all rows where processed.

We agree from a purist point of view it doesn’t qualify as a unit testing method. It basically minimizes the information to check in order to validate the job. Also it is very simple and informative to validate the results at very first level. Important point to note is that this method requires job to be executed on hadoop setup.

For reference please refer Exhaustive list of built in counters can be
found in Hadoop : The Definitive Guide.

5. LocalJobRunner to debug jobs using local filesystem

LocalJobRunner is more helpful in debugging the job than to test the job. It runs map reduce jobs in single JVM and hence can be easily debugged using IDE. It helps us to run the job against local file system.

To enable job execution using LocalJobRunner please set
conf.set(“mapred.job.tracker”, “local”)

In case we want to use local filesystem for input/output then set
conf.set(“fs.default.name”, “local”).

There are few limitations in using this solution like single reducer, no distributed nature but its very easy to debug job using this approach.

Conclusion

We are big fan of TDD. We wish that the post helps one to understand various techniques to test map reduce jobs. All tests may not be necessary but have different capability to help us mature our solution. Few tests require cluster, few require mocking,few can be executed on IDE, few are very fast and few are complete test solution.

References :

https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial
Hadoop MapReduce Tutorial [http://hadoop.apache.org/docs/r0.20.203.0/mapred_tutorial.html]
Hadoop: The Definitive Guide, by Tom  White.http://shop.oreilly.com/product/9780596521981.do]
Pro Hadoop[http://www.amazon.com/dp/B008PHZ3A2]