Monitoring Spring Data jobs with Spring Batch Admin UI

We can launch spring Data jobs using Spring Batch. This has been explained in my previous post. The Spring Integration+Spring Batch+Spring Data ETL can be monitored using Spring Batch Admin. This is same as oozie web app displaying job status.

Changes in previous post example to attach Spring Batch Admin :

    • The job repository in last example was using in memory persistence. To enable Admin UI we need shared DB. Let us configure app with mysql db. Changes in application-context.xml.

<beans:bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <beans:property name="dataSource" ref="dataSource"/>
    <beans:property name="transactionManager" ref="transactionManager" />
	</beans:bean>
	

	<beans:bean name="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<beans:property name="driverClassName" value="com.mysql.jdbc.Driver"/>
		<beans:property name="url" value="jdbc:mysql://localhost:3306/spring1"/>
		<beans:property name="username" value="root"/>
		<beans:property name="password" value="impetus"/>
	</beans:bean>

  • Add the mysql client driver jar in pom.xml.
       <dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.24</version>
	</dependency>

  • Expand the war file in tomcat webapps. You will find batch-hsql.properties and batch-mysql.properties in WEB-INF/classes folder. Change them as per our configuration.
  • Start the tomcat and hit url to load this webapp. This will generate all the relevant tables in mysql. The same tables will be used by our app when some job is being executed.

Spring Integration + Batch + Hive as ETL

End to end pipeline to analyze data on hadoop is a very common problem.

People have been predominantly solving this problem either applying ETL to get the data into hadoop ecosystem. Some people solve the problem using technology stack of Apache Flume, Oozie coordinator ,Hive/Pig.[https://github.com/cloudera/cdh-twitter-example] We were designing similar solution in which weblogs were frequently collected into data staging directory. Few hive queries needed to be executed on those logs to show reports using Tableau. Using too many different technologies and binding them was somewhat not alluring.  We solved the same problem using Spring integration + Spring Batch + Spring Data[HiveRunner]. In this stack all the layers are Spring based and hence can be easily integrated and above that easy to maintain, test and modify.

Solution Steps :

    • Copy data to hdfs using spring integration file inbound adapter ,hdfs outbound adapter and polling mechanism.The same can be achieved by adding following to application context

<int:channel id="filesChannel"/>
<file:inbound-channel-adapter id="localFileAdapter"
				 channel="filesChannel"
	                          directory="<local path>"
	                          filename-pattern="<file pattern>"> ---*.log
	<int:poller id="poller" fixed-delay="5000"  />
</file:inbound-channel-adapter>

<int:outbound-channel-adapter id="hdfsAdapter"
				 channel="filesChannel"  ref="hiveBatchFlow"/>
 
    • Configure the spring batch job which invokes hiveTasklet.
         <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
	<beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
	<beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository"/>

	<beans:bean class="org.springframework.batch.core.scope.StepScope">
		<beans:property name="proxyTargetClass" value="true"/>
	</beans:bean>

	<batch:job id="job1">
		<batch:step id="hive">
			<batch:tasklet ref="hiveTasklet" />
		</batch:step>
	</batch:job>
         
    • Configure the bean which would be invoked when new file is kept into the local file system directory. This class would hold reference to batch job and jobLauncher. This class should be extending AbstractReplyProducingMessageHandler.
<beans:bean id="hiveBatchFlow"
	      class="com.example.HiveBatchFlowHandler">
		<beans:constructor-arg value="<hdfs location>"/>
		<beans:constructor-arg ref="hadoopConfiguration"/>
		<beans:constructor-arg ref="jobLauncher"/>
		<beans:constructor-arg ref="job1"/>
	</beans:bean>
    • This class should override protected Object handleRequestMessage(Message<?> requestMessage).
      In this method we can invoke the spring batch job.
           jobLauncher.run(job,new JobParametersBuilder().toJobParameters());
         
  • So far we are able to add a poller to local directory which copies any file with extension txt. When such file is found the HiveFlowHandler is invoked and control passees to handleRequestMessage. Here we can fine grain the hdfs directory path and also after copying the file to HDFS using spring hadoop FileSystem class we will launch the spring batch job which invokes a hive step. This means will execute the hive script. To complete the picture we need to configure the hive Client and hive tasklet.This can be done by adding following to applicationContext
           <hdp:configuration id="hadoopConfiguration">
		fs.default.name=${hd.fs}
	</hdp:configuration>
<hdp:hive-client-factory host="${hive.host}" port="${hive.port}"/>

	<!-- the tasklet is same as shared on spring Data Hive page-->

	<hdp:hive-tasklet id="hiveTasklet">
		<hdp:script location="apache-log-simple.hql">
			<arguments>
				hiveContribJar=${app.repo}/hive-contrib-0.8.1.jar
				localInPath="${app.home}/data/apache.log"
			</arguments>
		</hdp:script>
	</hdp:hive-tasklet>
         
  • We used maven build very similar to examples provided on Spring Data project.

I will try to put the sourcode on github as soon as possible.

Thread safe access to HBase Tables using HTablePool and Spring HBase

Accessing HBase table by HTable is not threadsafe.

In order to access HTable instance in multithreaded mode HTablePool or HBaseTemplate should be used. The latter one is DAO pattern support by Spring. HTablePool is suggested by apache hbase client library.

Lets first discuss the HTablePool way.

HTablePool is not just a simple pool of HTable objects but handle thread-local objects as well. It allows the HTablePool to be initialized in resuable and threadLocal mode both. By supporting ThreadLocal mode it takes away pain of ThreadLocal objects usage in app code.

In order to initialize the HTablePool with threadLocal pool use this constructor :
HTablePool Constructor. When PoolType is set to ThreadLocal it actually binds the resource to the thread from which it is invoked.

This way has been suggested as a defacto to access HTable but here we have to write boiler plate code to access the pool i.e., get the HTable and close the resources and handling checked exception. In short we miss the support of spring DAO.

The same thing can be achieved using Spring Data Hadoop – HBase module.Spring data module provides HBaseTemplate class which is threadsafe in nature. It encapsulates all the boiler plate and provides famous spring exception conversion. The example usage can be found at  https://github.com/SpringSource/spring-data-book/tree/master/hadoop/hbase. This link has one issue related in application-context.xml. We need to set the zookeeper properties in HBaseConfiguration object which is missing in the example.

Only issue with this approach is that it keeps on creating and destroying HTable objects with every method call. This actually is negating usage of pools. In order to avoid recreating HTable objects one should use the Spring HbaseSynchronizationManager. It binds the HTable to the calling thread thus introducing the concept of Threadlocal objects. Each subsequent call made through HbaseTemplate is aware of the table bound and will use it instead of retrieving a new instance.  It can be set manually or through interceptors(AOP) using HbaseInterceptor. The manual setting example can be found at TestTemplate. Using interceptor may affect performance.

Spring way provides the same benefits as we get using spring DAO for RDBMS.