Monitoring Spring Data jobs with Spring Batch Admin UI

We can launch spring Data jobs using Spring Batch. This has been explained in my previous post. The Spring Integration+Spring Batch+Spring Data ETL can be monitored using Spring Batch Admin. This is same as oozie web app displaying job status.

Changes in previous post example to attach Spring Batch Admin :

    • The job repository in last example was using in memory persistence. To enable Admin UI we need shared DB. Let us configure app with mysql db. Changes in application-context.xml.

<beans:bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <beans:property name="dataSource" ref="dataSource"/>
    <beans:property name="transactionManager" ref="transactionManager" />
	</beans:bean>
	

	<beans:bean name="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<beans:property name="driverClassName" value="com.mysql.jdbc.Driver"/>
		<beans:property name="url" value="jdbc:mysql://localhost:3306/spring1"/>
		<beans:property name="username" value="root"/>
		<beans:property name="password" value="impetus"/>
	</beans:bean>

  • Add the mysql client driver jar in pom.xml.
       <dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.24</version>
	</dependency>

  • Expand the war file in tomcat webapps. You will find batch-hsql.properties and batch-mysql.properties in WEB-INF/classes folder. Change them as per our configuration.
  • Start the tomcat and hit url to load this webapp. This will generate all the relevant tables in mysql. The same tables will be used by our app when some job is being executed.
Advertisements

Matrix Multiplication on Hadoop MapReduce

Matrix multiplication is a problem which inherently doesn’t fit to mapReduce programming model as it can’t be divided and conquered.

Matrix multiplication is an important step in many m/c learning algorithms. Mahout library provides an implementation of matrix multiplication over hadoop. The problem with that implementation is that it starts only single mapper task as it uses CompositeInputFormat.

In order to calculate document similarity we had to perform matrix multiplication of order [6000,300] and [300,25000]. When this was done over mahout it took lot of time.

Thus we implemented over own logic for the same.

Here’s the steps to perform matrix multiplication :

Input :

1.  Path 1, of sequential file where key is of type IntWritable and and value is of type VectorWritable[please check mahout library for reference] representing first matrix.

2.  Path 2, of sequential file where key is of type IntWritable and and value is of type VectorWritable[please check mahout library for reference] representing second matrix.

Logic :

If we transpose the second matrix then it is essentially a cartesian product between two files. For example consider M1 = [{1,2,},{3,4},{,5,6}] and M2=[{A,B,C},{D,E,F}] then M1M2 = [{1A+2D,1B+2E,1C+2F},{3A+4D,3B+4E,3C+4F},{5A+6D,5B+6E,5C+6F}]

now M2′ = [{A,D},{B,E},{C,F}]

One can perform Cartesian Product between M1and M2′ to achieve at the same result.

Steps :

1. Perform transpose of second file. Reference implementation can be found at  :

http://grepcode.com/file/repo1.maven.org/maven2/org.apache.mahout/mahout-core/0.4/org/apache/mahout/math/hadoop/TransposeJob.java

2. Use CartesianInputFormat and CartesianRecordReader to calculate the input splits in order to parallelize cartesian product. The reference can be found at https://github.com/adamjshook/mapreducepatterns/tree/master/MRDP/src/main/java/mrdp/ch5 [From : MapReduce Design Patterns]

It actually picks the inputSplits from two input files and create a list mapping each left side input split with right side. So if first file has 3 splits and second has 4 then we will have 3*4=12 splits. Thus we will have 12 mappers.

3. Write a mapper which takes the two vectors and multiply each list index item and add them up. Emit the key as left side file’s key and value as Pair of right side key and actual cell value.

4. Write a reducer which now converts the pair object into VectorWritable object.

Job Configuration will be like :

job.setMapperClass(CartesianMultiplicationMapper.class);

job.setInputFormat(CartesianInputFormat.class);

CartesianInputFormat.setLeftInputInfo(job, SequenceFileInputFormat.class,
                 “path1”);
        CartesianInputFormat.setRightInputInfo(job, SequenceFileInputFormat.class,
                 “path2”);

        SequenceFileOutputFormat.setOutputPath(job, new Path(“cartOutput));

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(VectorWritable.class);

Will provide actual implementation on github.

Unit Testing Hadoop Map Reduce Jobs

In this post we would discuss various strategies to test and validate the map reduce jobs for hadoop.

Being a parallel programming framework it becomes a bit difficult to properly unit test and validate map reduce jobs from a developer’s scope let alone the Test Driven Development.

We will focus on various ways to do unit testing for map reduce jobs.

In the post we will discuss how to validate map reduce output using :
1. JUnit framework to test mappers and reducers using mocking (Mockito)
2. MRUnit framework to completely test the flow but in a single JVM.
3. mini-HDFS and a mini-MapReduce cluster to perform Integration Testing.
4. Hadoop Inbuilt Counters
5. LocalJobRunner to debug jobs using local filesystem.

1. JUnit framework to test mappers and reducers using mocking (Mockito)

Junit tests can be easily executed for Map Reduce jobs provided we test map function and reduce function in isolation. We can also test Driver function but with SpringData -Hadoop[http://www.springsource.org/spring-data/hadoop] project, driver configuration can be moved out of code. Using springData beans can further ease testing. If we execute the map and reduce function in isolation then there is dependency only on context object. We can easily clone context object using Mockito[http://code.google.com/p/mockito/].

All the tests can be executed from IDE. We just need to hadoop distribution jars and Mockito and junit jars in classpath.

Example to test the WordCount Mapper. It works with hadoop 1.0.3 and junit 4.1. We have mocked the context class.

public class WordCountTest {

	private TokenizerMapper mapper;
	private Context context;
	final Map<Object,Object> test = new HashMap();
	final AtomicInteger counter = new AtomicInteger(0);

	@Before
	public void setUp() throws Exception {
		mapper = new TokenizerMapper();
		context = mock(Context.class);
	}

	@Test
	public void testMethod() throws IOException, InterruptedException {

		doAnswer(new Answer<Object>() {
			public Object answer(InvocationOnMock invocation) {
				Object[] args = invocation.getArguments();
				test.put(args[0].toString(), args[1].toString());
				counter.incrementAndGet();
				return "called with arguments: " + args;
			}
		}).when(context).write(any(Text.class),any(IntWritable.class));

		mapper.map(new LongWritable(1L), new Text("counter counter counter" +
		" test test test"), context);
		Map<String,String> actualMap = new HashMap<String, String>();
		actualMap.put("counter", "1");
		actualMap.put("test", "1");
		assertEquals(6,counter.get());
		assertEquals(actualMap, test);
	}
}

On the similiar lines reducer can be tested.

Key to use this strategy effectively is to refactor the code properly.Business logic related code should be moved out of map and reduce methods. It helps in effectively testing the business logic. We should also think about moving the mapper and reducer is separate classes. This follows strategy pattern and better reusability.

Junit tests with Mockito are very easy to use. Only problem is that we can not test the solution as a whole. It at max certifies the business logic. We should consider other testing strategy to test the complete solution.

Hadoop the Definitive Guide can be used as a reference[http://shop.oreilly.com/product/9780596521981.do].

2. MRUnit framework to completely test the flow but in a single JVM.

MRUnit is testing framework which provides support structure to test map reduce jobs. It provides mocking support which can be helpful in testing Mapper, Reducer, Mapper+Reducer and Driver as well.http://mrunit.apache.org/ is a top level apache project now. It takes JUnit mocking a level up for map reduce job testing.

Example
We require mrunit and mockito jars and hadoop supporting jars. Test has been executed on hadoop 0.20.203 and junit4. We are testing the PiEstimator example provided with hadoop distribution.

public class TestExample {

	MapDriver<LongWritable, LongWritable, BooleanWritable,
	LongWritable> mapDriver;
	ReduceDriver<BooleanWritable, LongWritable, WritableComparable<?>,
	Writable> reduceDriver;
	MapReduceDriver<LongWritable, LongWritable, BooleanWritable,
	LongWritable, WritableComparable<?>, Writable> mapReduceDriver;

	@Before
	public void setUp() {
		PiEstimator.PiMapper mapper = new PiEstimator.PiMapper();
		PiEstimator.PiReducer reducer = new PiEstimator.PiReducer();
		mapDriver = new MapDriver<LongWritable, LongWritable,
		BooleanWritable, LongWritable>();
		mapDriver.setMapper(mapper);
		reduceDriver = new ReduceDriver<BooleanWritable, LongWritable,
		WritableComparable<?>, Writable>();
		reduceDriver.setReducer(reducer);
		mapReduceDriver = new MapReduceDriver<LongWritable, LongWritable,
		BooleanWritable,LongWritable,
		WritableComparable<?>, Writable>();
		mapReduceDriver.setMapper(mapper);
		mapReduceDriver.setReducer(reducer);
	}

	@Test
	public void testMapper() {
		mapDriver.withInput(new LongWritable(10), new LongWritable(10));
		mapDriver.withOutput(new BooleanWritable(true), new LongWritable(10));
		mapDriver.addOutput(new BooleanWritable(false), new LongWritable(0));
		mapDriver.runTest();
	}

	@Test
	public void testReducer() {
		List<LongWritable> values = new ArrayList<LongWritable>();
		values.add(new LongWritable(10));
		reduceDriver.withInput(new BooleanWritable(true), values);

		reduceDriver.runTest();
	}
}

These tests are extemely fast as we dont require any interaction with filesystem. This are very good but lacks support to test code in distributed environment. Please check
http://mrunit.apache.org/documentation/javadocs/0.9.0-incubating/org/apache/hadoop/mrunit/mock/package-summary.html
for other useful support classes. These tests can be sufficient to test code in isolation but doesn’t test interaction with HDFS and test execution on cluster.

3. mini-HDFS and a mini-MapReduce cluster to perform Integration Testing

There can be certain issues which might be caught in integration test only. Anything used as a object variable may be caught while executing job on cluster. Hadoop has support to launch a dummy cluster to create a testing environment. Supporting classes for dummy cluster are MiniDFSCluster, MiniMRCluster and ClusterMapReduceTestCase. Hadoop
internally uses these classes for testing.It launches two DataNodes and a NameNode, and a mini-MapReduce cluster with two TaskTrackers and a JobTracker.

Test set up :
Classpath should have hadoop-core.jar(I executed test on 0.20.203),hadoop- default.xml,hadoop-test.jar and all jetty related jar which can be found in lib folder.

Set following system property

System.setProperty(“hadoop.log.dir”, “test_dir”);
This is the directory where dummy cluster writes and reads files and logs. It should be created or during setup recreate the directory.

If you get some parsing error please set
System.setProperty(“javax.xml.parsers.SAXParserFactory”,
“com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl”);

We would be testing the most common example i.e. wordCount. We are using junit4 and hadoop 0.20.203.

This example creates a filesystem and MRcluster. We are also checking the counters in this example.

public class WordCountTest {

	private TokenizerMapper mapper;
	private Context context;
	final Map<Object,Object> test = new HashMap();
	final AtomicInteger counter = new AtomicInteger(0);
	private MiniDFSCluster dfsCluster = null;
	private MiniMRCluster mrCluster = null;

	private final Path input = new Path("input");
	private final Path output = new Path("output");

	@Before
	public void setUp() throws Exception {
		new File("NCHAPLOT_LOG").mkdirs();
		System.setProperty("hadoop.log.dir", "NCHAPLOT_LOG");
		final String rootLogLevel =
		System.getProperty("virtual.cluster.logLevel","WARN");
		final String testLogLevel = System.getProperty("test.log.level", "INFO");
		System.setProperty("javax.xml.parsers.SAXParserFactory",
		"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
		// LOG.info("Setting Log Level to " + rootLogLevel);
		LogManager.getRootLogger().setLevel(Level.toLevel(rootLogLevel));
		Configuration conf = new Configuration();
		dfsCluster = new MiniDFSCluster(conf, 1, true, null);
		dfsCluster.getFileSystem().makeQualified(input);
		dfsCluster.getFileSystem().makeQualified(output);

		assertNotNull("Cluster has a file system", dfsCluster.getFileSystem());
		mrCluster = new MiniMRCluster(1,
		dfsCluster.getFileSystem().getUri().toString(), 1);
		mapper = new TokenizerMapper();
		context = mock(Context.class);

	}

	protected FileSystem getFileSystem() throws IOException {
		return dfsCluster.getFileSystem();
	}

	private void createInput() throws IOException {
		Writer wr = new OutputStreamWriter(getFileSystem().create(new Path(input, "wordcount")));
		wr.write("neeraj chaplot neeraj\n");
		wr.close();
	}

	@Test
	public void testJob() throws IOException,
	InterruptedException, ClassNotFoundException {
		Configuration conf = mrCluster.createJobConf();

		createInput();

		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setNumReduceTasks(1);
		FileInputFormat.addInputPath(job,input);
		FileOutputFormat.setOutputPath(job, output);

		job.waitForCompletion(true);

		final String COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
		Counters ctrs = job.getCounters();
		System.out.println("Counters: " + ctrs);
		long combineIn = ctrs.findCounter(COUNTER_GROUP,
		"COMBINE_INPUT_RECORDS").getValue();
		long combineOut = ctrs.findCounter(COUNTER_GROUP,
		"COMBINE_OUTPUT_RECORDS").getValue();
		long reduceIn = ctrs.findCounter(COUNTER_GROUP,
		"REDUCE_INPUT_RECORDS").getValue();
		long mapOut = ctrs.findCounter(COUNTER_GROUP,
		"MAP_OUTPUT_RECORDS").getValue();
		long reduceOut = ctrs.findCounter(COUNTER_GROUP,
		"REDUCE_OUTPUT_RECORDS").getValue();
		long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
		"REDUCE_INPUT_GROUPS").getValue();

		assertEquals("map out = combine in", mapOut, combineIn);
		assertEquals("combine out = reduce in", combineOut, reduceIn);
		assertTrue("combine in > combine out", combineIn > combineOut);
		assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);

		InputStream is = getFileSystem().open(new Path(output,
		"part-r-00000"));
		BufferedReader reader = new BufferedReader(new
		InputStreamReader(is));

		assertEquals("chaplot\t1", reader.readLine());
		assertEquals("neeraj\t2", reader.readLine());
		assertNull(reader.readLine());
		reader.close();
	}

	@After
	public void tearDown() throws Exception {
		if (dfsCluster != null) {
			dfsCluster.shutdown();
		}
		if (mrCluster != null) {
			mrCluster.shutdown();
		}
	}
}

These tests are useful if we want to test code on cluster from IDE without launching a separate cluster. Keep in mind this wont help us in debugging. These tests are time consuming. The code in After and before if possible should be moved to beforeClass and AfterClass method. This is most concrete way to validate our job.

Only issue we observed here was related to time taken to execute a test. More information can be found in Pro Hadoop book[http://www.amazon.com/dp/B008PHZ3A2] and examples provided with Hadoop the definitive guide. Hadoop also ships test written using same support classes. Please check  TestMapReduceLocal.java. There are many other utility classes provided with Hadoop code which helps in testing like., MapReduceTestUtil.

4. Hadoop Inbuilt Counters

Counters help in quantitative analysis of the job. It provides aggregated statistics at the end and hence can be referred to validate the output. Hadoop provides some built in as well as user defined counters. We can analysis them using apis in
Driver class or all counters are listed in output logs at last.
The best thing about counters are that they work at the cluster level i.e., provides aggregated information about all the mappers and reducers.

Built-in Counters
Hadoop provides some built in counter to provide information about each process of hadoop for a particular job.

Few important ones for debugging and testing perspective:
MAP_INPUT_RECORDS — number of input records consumed by all the maps.
MAP_OUTPUT_RECORDS — number of output records produced by all the maps.
REDUCE_INPUT_RECORDS — number of input reocords consumed by all the reducers.
REDUCE_OUTPUT_RECORDS — number of output records produced by all the reducers.

User-Defined Java Counters
We can have our own counters to report the state of job. These provide output in form of a map. There are two ways to create and access the counters viz., enums and Strings. Enum is more easy and is type safe. It should be used in case we know all the output states in advance. Enum based counters are best suited for case where we want to
calculate the number of requests based on HttpResponseCode. String based counters are dynamic and can be used where we don’t have visibility in advance. This can be used when we want to do count based on domain.

Example

Consider the simple wordCount Example. Lets try to find out are we processing all the rows or not.

We will pick the “MAP_INPUT_RECORDS” to know how many rows were presented as input. We are using two enum counters to count the number of null and not null rows.

public class WordCount {

	public static class TokenizerMapper
	extends Mapper<Object, Text, Text, IntWritable>{

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context
		) throws IOException, InterruptedException {
			//incrementing the counters
			if(value == null || value.toString().equals("")) {
				context.getCounter(State.NULL).increment(1);
			}else {
				context.getCounter(State.NOT_NULL).increment(1);
			}
			StringTokenizer itr = new StringTokenizer(value.toString());

			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer
	extends Reducer<Text,IntWritable,Text,IntWritable> {
	private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable values,
		Context context
		) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	//defining the enum
	enum State {
		NULL,
		NOT_NULL
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		Job job = new Job(conf, "word count");
		//for brevity purpose full job config not shown
		job.waitForCompletion(true);
		//reading all the counters
		long inputCount =
		job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter",
		"MAP_INPUT_RECORDS").getValue();
		System.out.println("Total Input Rows ::::"+inputCount);
		System.out.println("Not Null Rows ==="    +job.getCounters().findCounter
		(State.NOT_NULL).getValue());
		System.out.println(" Null Rows ==="    +job.getCounters().findCounter
		(State.NULL).getValue());
		System.exit( 0);
	}

}

The sum of null and not null rows can be counted to make sure that all rows were being processed. This is a very simplest of example. Good thing is that we have to process only few counter information to analysis job state. Example runs on Hadoop 1.0.3.

Counters are best suited for scenario where we want to validate output at aggregated level ., like whether all rows where processed.

We agree from a purist point of view it doesn’t qualify as a unit testing method. It basically minimizes the information to check in order to validate the job. Also it is very simple and informative to validate the results at very first level. Important point to note is that this method requires job to be executed on hadoop setup.

For reference please refer Exhaustive list of built in counters can be
found in Hadoop : The Definitive Guide.

5. LocalJobRunner to debug jobs using local filesystem

LocalJobRunner is more helpful in debugging the job than to test the job. It runs map reduce jobs in single JVM and hence can be easily debugged using IDE. It helps us to run the job against local file system.

To enable job execution using LocalJobRunner please set
conf.set(“mapred.job.tracker”, “local”)

In case we want to use local filesystem for input/output then set
conf.set(“fs.default.name”, “local”).

There are few limitations in using this solution like single reducer, no distributed nature but its very easy to debug job using this approach.

Conclusion

We are big fan of TDD. We wish that the post helps one to understand various techniques to test map reduce jobs. All tests may not be necessary but have different capability to help us mature our solution. Few tests require cluster, few require mocking,few can be executed on IDE, few are very fast and few are complete test solution.

References :

https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial
Hadoop MapReduce Tutorial [http://hadoop.apache.org/docs/r0.20.203.0/mapred_tutorial.html]
Hadoop: The Definitive Guide, by Tom  White.http://shop.oreilly.com/product/9780596521981.do]
Pro Hadoop[http://www.amazon.com/dp/B008PHZ3A2]