Insert large data into hive dynamic partitions

Dynamic partitioned hive table can help to store raw data into partitioned form which may be helpful in further querying.

Select query is generally faster when executed on a partitioned table. Its always adviced to create an external table on raw file in HDFS and then insert that data into partitioned table.

But when we try to insert really large file into a dynamically partitioned table it many a times fails due to many files being opened at mapper stage.

Following steps can solve the issue of massive data insert into hive dynamically partitioned table :

Lets say table is like :

CREATE  TABLE IF NOT EXISTS stocks_main(
exchange STRING,
symbol STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
PARTITIONed BY (ymd STRING)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’  LOCATION ‘/user/impadmin/NYSE_PARTITIONED’;

and Stocks is an external table created over raw data in HDFS with same schema.

Now we need to pull data from Stocks table into partitioned Stocks_Main table.

We need to set following properties to enable inserts :

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.parallel=false;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;

Execute following query to insert into table :

insert into  table stocks_main partition(ymd) select exchange,symbol,price_open,price_high,price_low,price_close,volume,price_adj_close,ymd from stocks;

This results in only mapper programs and in case of many files being opened, the insert operation fails.

This can be sorted out by moving the file writing operation at reducer stage by executing the above query with “distribute by” clause :

insert into  table stocks_main partition(ymd) select exchange,symbol,price_open,price_high,price_low,price_close,volume,price_adj_close,ymd from stocks distribute by ymd;

This will convert only mappers to map reduce and fix problem of huge file writing to hive table.

 

Spring Integration + Batch + Hive as ETL

End to end pipeline to analyze data on hadoop is a very common problem.

People have been predominantly solving this problem either applying ETL to get the data into hadoop ecosystem. Some people solve the problem using technology stack of Apache Flume, Oozie coordinator ,Hive/Pig.[https://github.com/cloudera/cdh-twitter-example] We were designing similar solution in which weblogs were frequently collected into data staging directory. Few hive queries needed to be executed on those logs to show reports using Tableau. Using too many different technologies and binding them was somewhat not alluring.  We solved the same problem using Spring integration + Spring Batch + Spring Data[HiveRunner]. In this stack all the layers are Spring based and hence can be easily integrated and above that easy to maintain, test and modify.

Solution Steps :

    • Copy data to hdfs using spring integration file inbound adapter ,hdfs outbound adapter and polling mechanism.The same can be achieved by adding following to application context

<int:channel id="filesChannel"/>
<file:inbound-channel-adapter id="localFileAdapter"
				 channel="filesChannel"
	                          directory="<local path>"
	                          filename-pattern="<file pattern>"> ---*.log
	<int:poller id="poller" fixed-delay="5000"  />
</file:inbound-channel-adapter>

<int:outbound-channel-adapter id="hdfsAdapter"
				 channel="filesChannel"  ref="hiveBatchFlow"/>
 
    • Configure the spring batch job which invokes hiveTasklet.
         <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
	<beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
	<beans:bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository"/>

	<beans:bean class="org.springframework.batch.core.scope.StepScope">
		<beans:property name="proxyTargetClass" value="true"/>
	</beans:bean>

	<batch:job id="job1">
		<batch:step id="hive">
			<batch:tasklet ref="hiveTasklet" />
		</batch:step>
	</batch:job>
         
    • Configure the bean which would be invoked when new file is kept into the local file system directory. This class would hold reference to batch job and jobLauncher. This class should be extending AbstractReplyProducingMessageHandler.
<beans:bean id="hiveBatchFlow"
	      class="com.example.HiveBatchFlowHandler">
		<beans:constructor-arg value="<hdfs location>"/>
		<beans:constructor-arg ref="hadoopConfiguration"/>
		<beans:constructor-arg ref="jobLauncher"/>
		<beans:constructor-arg ref="job1"/>
	</beans:bean>
    • This class should override protected Object handleRequestMessage(Message<?> requestMessage).
      In this method we can invoke the spring batch job.
           jobLauncher.run(job,new JobParametersBuilder().toJobParameters());
         
  • So far we are able to add a poller to local directory which copies any file with extension txt. When such file is found the HiveFlowHandler is invoked and control passees to handleRequestMessage. Here we can fine grain the hdfs directory path and also after copying the file to HDFS using spring hadoop FileSystem class we will launch the spring batch job which invokes a hive step. This means will execute the hive script. To complete the picture we need to configure the hive Client and hive tasklet.This can be done by adding following to applicationContext
           <hdp:configuration id="hadoopConfiguration">
		fs.default.name=${hd.fs}
	</hdp:configuration>
<hdp:hive-client-factory host="${hive.host}" port="${hive.port}"/>

	<!-- the tasklet is same as shared on spring Data Hive page-->

	<hdp:hive-tasklet id="hiveTasklet">
		<hdp:script location="apache-log-simple.hql">
			<arguments>
				hiveContribJar=${app.repo}/hive-contrib-0.8.1.jar
				localInPath="${app.home}/data/apache.log"
			</arguments>
		</hdp:script>
	</hdp:hive-tasklet>
         
  • We used maven build very similar to examples provided on Spring Data project.

I will try to put the sourcode on github as soon as possible.