Monitoring Spring Data jobs with Spring Batch Admin UI

We can launch spring Data jobs using Spring Batch. This has been explained in my previous post. The Spring Integration+Spring Batch+Spring Data ETL can be monitored using Spring Batch Admin. This is same as oozie web app displaying job status.

Changes in previous post example to attach Spring Batch Admin :

    • The job repository in last example was using in memory persistence. To enable Admin UI we need shared DB. Let us configure app with mysql db.¬†Changes in application-context.xml.

<beans:bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <beans:property name="dataSource" ref="dataSource"/>
    <beans:property name="transactionManager" ref="transactionManager" />
	</beans:bean>
	

	<beans:bean name="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<beans:property name="driverClassName" value="com.mysql.jdbc.Driver"/>
		<beans:property name="url" value="jdbc:mysql://localhost:3306/spring1"/>
		<beans:property name="username" value="root"/>
		<beans:property name="password" value="impetus"/>
	</beans:bean>

  • Add the mysql client driver jar in pom.xml.
       <dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.24</version>
	</dependency>

  • Expand the war file in tomcat webapps. You will find batch-hsql.properties and batch-mysql.properties in WEB-INF/classes folder. Change them as per our configuration.
  • Start the tomcat and hit url to load this webapp. This will generate all the relevant tables in mysql. The same tables will be used by our app when some job is being executed.

Spring Integration + Batch + Hive as ETL

End to end pipeline to analyze data on hadoop is a very common problem.

People have been predominantly solving this problem either applying ETL to get the data into hadoop ecosystem. Some people solve the problem using technology stack of Apache Flume, Oozie coordinator ,Hive/Pig.[https://github.com/cloudera/cdh-twitter-example] We were designing similar solution in which weblogs were frequently collected into data staging directory. Few hive queries needed to be executed on those logs to show reports using Tableau. Using too many different technologies and binding them was somewhat not alluring.  We solved the same problem using Spring integration + Spring Batch + Spring Data[HiveRunner]. In this stack all the layers are Spring based and hence can be easily integrated and above that easy to maintain, test and modify.

Solution Steps :

    • Copy data to hdfs using spring integration file inbound adapter ,hdfs outbound adapter and polling mechanism.The same can be achieved by adding following to application context

<int:channel id="filesChannel"/>
<file:inbound-channel-adapter id="localFileAdapter"
				 channel="filesChannel"
	                          directory="<local path>"
	                          filename-pattern="<file pattern>"> ---*.log
	<int:poller id="poller" fixed-delay="5000"  />
</file:inbound-channel-adapter>

<int:outbound-channel-adapter id="hdfsAdapter"
				 channel="filesChannel"  ref="hiveBatchFlow"/>
 
    • Configure the spring batch job which invokes hiveTasklet.
         <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
	<beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
	<beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository"/>

	<beans:bean class="org.springframework.batch.core.scope.StepScope">
		<beans:property name="proxyTargetClass" value="true"/>
	</beans:bean>

	<batch:job id="job1">
		<batch:step id="hive">
			<batch:tasklet ref="hiveTasklet" />
		</batch:step>
	</batch:job>
         
    • Configure the bean which would be invoked when new file is kept into the local file system directory. This class would hold reference to batch job and jobLauncher. This class should be extending AbstractReplyProducingMessageHandler.
<beans:bean id="hiveBatchFlow"
	      class="com.example.HiveBatchFlowHandler">
		<beans:constructor-arg value="<hdfs location>"/>
		<beans:constructor-arg ref="hadoopConfiguration"/>
		<beans:constructor-arg ref="jobLauncher"/>
		<beans:constructor-arg ref="job1"/>
	</beans:bean>
    • This class should override protected Object handleRequestMessage(Message<?> requestMessage).
      In this method we can invoke the spring batch job.
           jobLauncher.run(job,new JobParametersBuilder().toJobParameters());
         
  • So far we are able to add a poller to local directory which copies any file with extension txt. When such file is found the HiveFlowHandler is invoked and control passees to handleRequestMessage. Here we can fine grain the hdfs directory path and also after copying the file to HDFS using spring hadoop FileSystem class we will launch the spring batch job which invokes a hive step. This means will execute the hive script. To complete the picture we need to configure the hive Client and hive tasklet.This can be done by adding following to applicationContext
           <hdp:configuration id="hadoopConfiguration">
		fs.default.name=${hd.fs}
	</hdp:configuration>
<hdp:hive-client-factory host="${hive.host}" port="${hive.port}"/>

	<!-- the tasklet is same as shared on spring Data Hive page-->

	<hdp:hive-tasklet id="hiveTasklet">
		<hdp:script location="apache-log-simple.hql">
			<arguments>
				hiveContribJar=${app.repo}/hive-contrib-0.8.1.jar
				localInPath="${app.home}/data/apache.log"
			</arguments>
		</hdp:script>
	</hdp:hive-tasklet>
         
  • We used maven build very similar to examples provided on Spring Data project.

I will try to put the sourcode on github as soon as possible.