Spring Batch Integration - Grails FrameworkProcessing large data sets is often a complex activity that software developers have to deal with. Developers require advanced tools and technologies to process large chunks of data sets. Although, there are quite a few tools available to process large data sets, Spring Batch stands out from the competition. Spring Batch is a lightweight and robust framework that can be easily integrated with the Grails framework. This blog post explains how ‘chunk oriented processing’ can be integrated with ‘Spring Integration’ module.

Spring Batch Processing

Spring batch processing can be accomplished in two different ways.

  • Chunk Oriented Processing
  • Task Oriented Processing

This blog also explores the process of stopping a Spring Batch job effectively without compromising on performance, while executing big data sets. Further, it talks about restarting a failed/stopped batch job. Spring Integration provides a simple model for implementing complex enterprise integration solutions and facilitates asynchronous, message-driven behavior within a Spring-based application.

Chunk Oriented Processing

Chunk oriented processing refers to the process of reading data, one piece at a time and creating ‘chunks’ that are written to I/O stream within a transaction. In this process, an item is read from ‘ItemReader’ and handed over to an ‘ItemProcessor’ followed by ‘ItemWriter’. When the number of items read, matches commit interval the entire chunk is written out via ‘ItemWriter’ thereby committing the transaction.

Chunk oriented processing is applied when reading and writing is required for at least one data item. Subsequently, if reading/writing is required for data items then ‘TaskletStep Oriented Processing’ is used. The ‘Chunk Oriented Processing’ model has three important interfaces ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’, which are part of org.springframework.batch.item package.

  • ItemReader – The ‘ItemReader’ interface is used for providing data; it reads the data to be processed.
  • ItemProcessor – The ‘ItemProcessor’ interface is used for item transformation; it processes input object from ‘ItemReader’, transforming it to an output object.
  • ItemWriter – The ‘ItemWriter’ interface is used for generic output operations. It writes the data, which is transformed by ‘ItemProcessor’ i.e. data that is written to a database, memory, or output stream.

To provide clarity on Spring Batch integration, we have illustrated a sample application along with code, which we will write to a Microsoft Excel spreadsheet. Here are the steps and code that has been written in Grails 2.4 version. Further, the configuration/steps can easily function in any framework similar to Spring with minimal changes.

Step 1 – Resource.xml

The resource.xml defines all Spring Batch beans such as transaction manager and job repository to establishing a job context. Further, the message channel, message handler, message endpoint beans are defined for the message queue. Subsequently, bean job explorer and job registry are used to stop a running job and restart any failed/stopped jobs.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 
 xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
 xmlns:batch="http://www.springframework.org/schema/batch"
 xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
 xmlns:task="http://www.springframework.org/schema/task"
 xmlns:aop="http://www.springframework.org/schema/aop"
 xmlns:tx="http://www.springframework.org/schema/tx"
 
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-4.1.xsd
 http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
 http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
 http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.2.xsd
 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">


<!-- **************************************************
 *Spring Batch stuff 
 ************************************************** -->

<bean id="batchTxManager"
 class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
 <property name="dataSource" ref="dataSource_rimdb" />
</bean>

<bean id="batchJobRepository"
 class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
 <property name="dataSource" ref="dataSource_rimdb" />
 <property name="transactionManager" ref="batchTxManager" />
 <property name="isolationLevelForCreate" value="ISOLATION_READ_COMMITTED" />
 <property name="validateTransactionState" value="false" />
</bean>

<!-- **************************************************
 * Spring Integration stuff 
 ************************************************** -->

<bean id="msgStore" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
 <property name="dataSource" ref="dataSource_rimdb"/>
 <property name="channelMessageStoreQueryProvider" ref="msgStoreQueryProvider" />
</bean>

<!-- See also http://docs.spring.io/spring-integration/reference/htmlsingle/#channel-configuration-queuechannel
 and http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/system-management-chapter.html#message-store -->
<int:channel id="msgStoreChannel">
 <int:queue message-store="msgStore"/>
</int:channel>

<bean id="msgTxManager"
 class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
 <property name="dataSource" ref="dataSource_rimdb" />
</bean>

<bean id="msgHandler" class="com.test.batch.report.ReportJobDispatcher" />

<!-- We tie together the filter and polling consumer in a chain so we need not define
 additional channels. -->
<int:chain id="msgEndpointChain" input-channel="msgStoreChannel">

 <!-- Messages that are discarded becaus the thread pool is saturated will be queued again
 by setting the discard-channel to our input message queue -->
 <int:filter id="msgFilter" discard-channel="msgStoreChannel">
 <bean class="com.test.batch.integration.MsgFilter" />
 </int:filter>
 
 <!-- See also http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/messaging-endpoints-chapter.html#endpoint
 Adjust as needed. max-messages-per-poll can also be increased, each message is still handled within its own transaction
 See also http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/transactions.html#transaction-poller -->
 <int:poller max-messages-per-poll="1" fixed-rate="1000">
 <int:transactional
 transaction-manager="msgTxManager"
 propagation="REQUIRED"
 isolation="DEFAULT"
 timeout="10000"
 read-only="false" />
 </int:poller>
 <int:outbound-channel-adapter id="msgOutboundChannelAdapter" ref="msgHandler" method="handleMessage" />
 
</int:chain>

<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
 <property name="dataSource" ref="dataSource_rimdb" />
</bean> 

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

</beans>

Step 2 – BatchReportConfig

BatchReportConfig is a simple class that is used to obtain any bean from the Grails context created in resource.xml. Further, it also helps in accessing any Grails bean outside the Grails context i.e. other than the Grails controller/service classes.

Another option is the convenience class, which can be used for accessing batch-related configuration. This restricts the rest of application from being made aware of the Spring configuration thus centralizing the access to config.groovy.

import org.codehaus.groovy.grails.web.mapping.LinkGenerator
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

class BatchReportConfig {
 private static JobRepository jobRepository
 private static PlatformTransactionManager txManager
 private static MessageChannel msgChannel
 private static ThreadPoolTaskExecutor taskExecutor
 private static LinkGenerator linkGenerator
 private static ConfigObject configObj
 private static JobExplorer jobExplorer
 private static JobRegistry jobRegistry

 private BatchReportConfig() {}
 
 // ****************************************************
 // Convenience methods for Spring Beans
 // ****************************************************
 
 public static synchronized MessageChannel getMessageChannel() {
 if (!msgChannel)
 msgChannel = ApplicationContextHolder.getBean("msgStoreChannel")
 
 return msgChannel
 }
 
 public static synchronized PlatformTransactionManager getTxManager() {
 if (!txManager)
 txManager = ApplicationContextHolder.getBean("batchTxManager")
 
 return txManager
 }
 
 public static synchronized JobRepository getJobRepository() {
 if (!jobRepository) {
 jobRepository = ApplicationContextHolder.getBean("batchJobRepository")
 }
 
 return jobRepository
 }
 
 public static synchronized JobExplorer getJobExplorer() {
 if (!jobExplorer)
 jobExplorer = ApplicationContextHolder.getBean("jobExplorer")
 
 return jobExplorer
 }
 
 public static synchronized JobRegistry getJobRegistry() {
 if (!jobRegistry)
 jobRegistry = ApplicationContextHolder.getBean("jobRegistry")
 
 return jobRegistry
 }
 
 public static synchronized LinkGenerator getLinkGenerator() {
 if (!linkGenerator)
 linkGenerator = ApplicationContextHolder.getBean("grailsLinkGenerator")
 
 return linkGenerator
 }
 
 
}

Step 3 – ReportJobFactory

ReportJobFactory is an abstract class that declares abstract methods, so that they can be override/implemented in order to return appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’. A concrete method ‘buildJob’ is used to build a report job with job parameters and appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’ as specified in below ReportJob class. Additionally, the abstract class helps in creating a report job based on submitted job request type. However, this class must be subclassed for each new type of tool report.

import java.util.List
import java.util.Map

import com.test.batch.report.ReportJob
import com.test.batch.report.ReportJobConfig
import com.test.batch.integration.InputSource

import org.springframework.batch.core.JobParameters
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter

abstract class ReportJobFactory {
 private JobParameters jobParams
 private ReportJobConfig jobConfig
 private List inputList
 
 public ReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) {
 assert jobParams && jobConfig
 this.jobParams = jobParams
 this.jobConfig = jobConfig
 this.inputList = inputList
 }
 
 protected abstract ItemReader newListItemReader(JobParameters jobParams, List inputList)

 protected abstract ItemReader newFileItemReader(JobParameters jobParams)
 protected abstract ItemProcessor newItemProcessor(JobParameters jobParams)
 
 protected abstract ItemWriter newItemWriter(JobParameters jobParams)
 
 public ReportJob buildJob() {
 ItemReader itemReader
 if (InputSource.LIST.toString () == jobParams.getString("inputSource"))
 itemReader = newListItemReader(jobParams, inputList)
 else
 itemReader = newFileItemReader(jobParams)
 
 ItemProcessor itemProcessor = newItemProcessor(jobParams)
 ItemWriter itemWriter = newItemWriter(jobParams)

 return new ReportJob(jobConfig, jobParams, itemReader, itemProcessor, itemWriter)
 }
}

Step 4 – ReportJob

Wrapper class is used to build a report tool that is specific to batch job/step configuration. Further, this class generalizes necessary functionality, thereby making it easy to add new tool reports i.e. Developers need not have exhaustive knowledge of Spring Batch.

The wrapper class builds two steps:

  1. First is the ‘report step’ which executes ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’ with a chunk size (i.e. commit interval) that is used to segregate data into chunks/pieces. Moreover, the chunk size/commit interval has a greater importance in stopping a running job.
  2. The second is an optional email notification sent to an email after completion of ‘report step’ process with a message ‘success or failure’.
import com.test.batch.integration.ReportDelivery

import groovy.json.JsonBuilder
import java.util.concurrent.Executors

import org.apache.tomcat.jdbc.pool.DataSource
import org.springframework.batch.core.Job
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobExecutionListener
import org.springframework.batch.core.JobParameter
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.JobParametersBuilder
import org.springframework.batch.core.Step
import org.springframework.batch.core.job.SimpleJob
import org.springframework.batch.core.job.builder.JobBuilder
import org.springframework.batch.core.job.builder.SimpleJobBuilder
import org.springframework.batch.core.launch.support.RunIdIncrementer
import org.springframework.batch.core.repository.JobRepository
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean
import org.springframework.batch.core.repository.support.SimpleJobRepository
import org.springframework.batch.core.step.builder.SimpleStepBuilder
import org.springframework.batch.core.step.builder.StepBuilder
import org.springframework.batch.core.step.builder.TaskletStepBuilder
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.core.step.tasklet.TaskletStep
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter
import org.springframework.batch.item.support.IteratorItemReader

class ReportJob {
 private static final String REPORT_STEP_NAME = "report"
 private static final String NOTIFICATION_STEP_NAME = "notification"
 private static final String CLEANUP_STEP_NAME = "cleanup"
 
 private ReportJobConfig config
 private ItemReader itemReader
 private ItemProcessor itemProcessor
 private ItemWriter itemWriter
 
 private SimpleJob job
 private JobParameters jobParams
 
 public ReportJob(ReportJobConfig config, JobParameters jobParameters, ItemReader itemReader, ItemProcessor itemProcessor, ItemWriter itemWriter) {
 assert config && jobParameters && itemReader && itemProcessor && itemWriter 
 
 this.config = config
 this.jobParams = jobParameters
 this.itemReader = itemReader
 this.itemProcessor = itemProcessor
 this.itemWriter = itemWriter
 
 job = buildJob()
 job.registerJobExecutionListener(new ReportJobExecutionListener())
 }
 
 public Job getJob() {
 return job
 }
 
 public JobParameters getJobParameters() {
 return jobParams
 }
 
 public JobRepository getJobRepository() {
 return config.jobRepository
 }
 
 public String getToolName() {
 return jobParams.getString("toolName")
 }
 
 protected Step newReportStep() {
 SimpleStepBuilder stepBuilder = new StepBuilder(REPORT_STEP_NAME)
 ./*<String, Map>*/ chunk(jobParams.getLong("chunkSize").intValue()) // <input type, output type>
 .reader(itemReader)
 .writer(itemWriter)
 .processor(itemProcessor)
 .repository(config.jobRepository)
 .transactionManager(config.txManager)
 .startLimit(3) // give up after 3 tries
 TaskletStep step = stepBuilder.build()
 return step
 }
 
 protected Step newNotificationStep() {
 Tasklet tasklet = new MailNotificationTasklet(jobParams.getString("userEmail"),jobParams.getString("ccEmail"), jobParams.getString("toolDescr"),
 jobParams.getString("jobRef"), config.fileDir, Enum.valueOf(ReportDelivery, jobParams.getString("reportDelivery")))
 
 TaskletStepBuilder stepBuilder = new StepBuilder(NOTIFICATION_STEP_NAME)
 .tasklet(tasklet)
 .repository(config.jobRepository)
 .transactionManager(config.txManager)
 
 TaskletStep step = stepBuilder.build()
 return step
 }
 
 
 
 
 protected SimpleJob buildJob() { 
 SimpleJobBuilder jobBuilder = new JobBuilder(jobParams.getString("toolName"))
 .start(newReportStep())
 .next(newNotificationStep())
 .repository(config.jobRepository)
 .incrementer(new RunIdIncrementer()) // do we need this?
 
 return jobBuilder.build()
 }
}

Step 5 – ReportJobDispatcher

ReportJobDispatcher handles single job requests retrieved from a message queue. This is called from a polling outbound channel adapter in transaction contexts. If the method throws an exception (i.e. if the task executor is saturated) then message remains in the queue. Furthermore, it creates a new job instance from the given job request and instantly launches a job request. If the launch fails (i.e. due to executor threads being busy) then an execution is raised to retain message in the queue.

import com.test.batch.report.factory.ReportJobFactory
import com.test.batch.report.factory.ReportJobFactoryFactory
import com.test.batch.integration.ReportJobRequest
import org.springframework.batch.core.JobParameters

import groovy.json.JsonSlurper

import org.springframework.messaging.Message
import org.springframework.messaging.MessageHandler
import org.springframework.messaging.MessagingException

class ReportJobDispatcher implements MessageHandler {
 
 @Override
 public void handleMessage(Message<ReportJobRequest> message) throws MessagingException {
 assert message
 
 ReportJobRequest jobRequest = new ReportJobRequest(message.getPayload())

 dispatch(jobRequest)
 }

 protected void dispatch(ReportJobRequest jobRequest) { 
 assert jobRequest
 
 ReportJobConfig jobConfig = BatchReportConfig.newDefaultJobConfig()
 JobParameters jobParams = ReportJobParamsBuilder.build(jobRequest)
 
 ReportJobFactory reportJobFactory = ReportJobFactoryFactory.newReportJobFactory(jobParams, jobConfig, jobRequest.inputList)
 ReportJob reportJob = reportJobFactory.buildJob()
 if(jobRequest.jobExecutionId){
 if(jobRequest.jobRquestType == "Restart"){
 ReportJobLauncher.restart(reportJob, jobRequest.jobExecutionId)
 }else if(jobRequest.jobRquestType == "Stop"){
 ReportJobLauncher.stop(reportJob, jobRequest.jobExecutionId)
 }else{
 
 }
 }else{
 ReportJobLauncher.submit(reportJob) 
 }
 }
 
 
}


Step 6 – ReportJobFactoryFactory

This is a factory class used for creating a report job factory based on the tool name.

import com.test.batch.domainreports.gadatafeed.GaDataFeedReportJobFactory;
import com.test.batch.domainreports.livesitereport.LiveSiteReportReportJobFactory;
import com.test.batch.report.ReportJobConfig

import org.springframework.batch.core.JobParameters

class ReportJobFactoryFactory {

 private ReportJobFactoryFactory() {} 
 public static ReportJobFactory newReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) {
 assert jobParams && jobConfig
 
 ReportJobFactory factory
 String toolName = jobParams.getString("toolName")
 switch(toolName) {
 case "abc":
 factory = new AbcReportJobFactory(jobParams, jobConfig, inputList)
 break
 case "xyz":
 factory = new XyzReportJobFactory(jobParams, jobConfig, inputList)
 break
 default:
 throw new Exception("Report factory for tool [${toolName}] not found")
 }
 
 assert factory
 return factory
 }
}

A new job factory instantiated/created in the above class extends to ‘ReportJobfactory’ and overrides all the methods to return appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’.

import java.util.List
import java.util.Map

import com.test.batch.report.PassthroughFlatFileItemReader
import com.test.batch.report.ReportJobConfig
import com.test.batch.report.factory.ReportJobFactory

import org.springframework.batch.core.JobParameters
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter
import org.springframework.batch.item.support.IteratorItemReader

class AbcReportJobFactory extends ReportJobFactory {
 
 public AbcReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) {
 super(jobParams, jobConfig, inputList)
 }
 
 @Override
 protected ItemReader newListItemReader(JobParameters jobParams, List inputList) {
 return new IteratorItemReader(inputList)
 }
 
 @Override
 protected ItemReader newFileItemReader(JobParameters jobParams) {
 PassthroughFlatFileItemReader<String> itemReader = new PassthroughFlatFileItemReader<String>()
 itemReader.setSaveState(false)
 return itemReader
 }
 
 @Override
 protected ItemProcessor newItemProcessor(JobParameters jobParams) {
 return new AbcItemProcessor()
 }
 
 @Override
 protected ItemWriter newItemWriter(JobParameters jobParams) {
 AbcLineItemWriter itemWriter = new AbcLineItemWriter()
 
 return itemWriter
 }

}

ItemReader

The ‘ItemReader’ that is returned here is either a generic ‘IteratorItemReader’ or a ‘FlatFileItemReader’, which is adjudged depending on the data input.

  1. If the input data is a list, then it returns ‘IteratorItemReader’
  2. If it is a text file, then it returns ‘FlatFileItemReader’.

ItemProcessor

The ‘ItemProcessor’ implements a generic ItemProcessor with string as an input, which in turn returns a data map. The implementation process takes place in the process method and ‘ItemProcessor’ is called for each and every entry returned from ‘Itemreader’.

import java.util.Map

import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.StepExecution
import org.springframework.batch.core.annotation.BeforeStep
import org.springframework.batch.item.ItemProcessor

class AbcItemProcessor implements ItemProcessor<String, Map> {
 private boolean showIpAddr, showIp6Addr
 
 @BeforeStep
 void beforeStep(StepExecution stepExecution) {
 }

 @Override
 public Map process(String domain) throws Exception {
 
 return [:]
 } 
}

ItemWriter

The ‘ItemWriter’ implements generic ItemWriter data map, which is called after per set or chunk of data is processed by ‘ItemProcessor’ in order to write any output stream.

import com.test.batch.report.BatchReportFileUtils;

import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.util.HashMap;
import java.util.List;

import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext
import org.springframework.batch.item.ItemWriter;

class AbcLineItemWriter implements ItemWriter<HashMap> {
 
 @BeforeStep
 void beforeStep(StepExecution stepExecution) {
 
 }
 
 @AfterStep
 void afterStep(StepExecution stepExecution) {
 
 }
 
 @Override
 public void write(List<? extends HashMap> lstNameServerMap) throws Exception {
 
 
 }

}

ReportJobLauncher

The ReportJobLauncher picks a report job and launches/restarts/stops a job using Spring Batch job launcher with appropriate thread pool. We can use one thread pool per different tool to simplify the implementation process.

Submitting/Running a Job

Synchronize the ‘submit method’ to ensure no issues are reported while accessing some executor pool properties (although there should not be more than one instance of a caller). Submit method is called within transaction context of polling message handler to extract a job message from the queue.  If this throws an exception, then transaction is rolled back and message is retained in the queue for subsequent processing.

What we need for execution is a non-queuing thread pool with a maximum number of threads (i.e. direct hand-off; we do not want queuing by the thread pool, as it is not persistent. All persistent queuing is taken care by the message queue.), which can be achieved using a synchronous queue.

Note: If the maximum pool size is too small then the perseverant queue will build over time. For further details, please refer thread pool bean configuration under Resource.xml section.

import java.util.Map;
import java.util.concurrent.BlockingQueue
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import com.test.ApplicationContextHolder

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.configuration.JobRegistry
import org.springframework.batch.core.configuration.support.MapJobRegistry
import org.springframework.batch.core.configuration.support.ReferenceJobFactory
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.batch.core.launch.support.SimpleJobLauncher
import org.springframework.batch.core.launch.support.SimpleJobOperator
import org.springframework.batch.core.repository.JobRepository
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.batch.core.ExitStatus


class ReportJobLauncher {
 
 private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class)
 
 public static synchronized void submit(ReportJob reportJob) {
 assert reportJob
 
 ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName()) 
 if (taskExecutor.getThreadPoolExecutor().isShutdown())
 throw new TaskRejectedException("Task executor is shut down")
 
 if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName()))
 throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available")
 
 SimpleJobLauncher launcher = new SimpleJobLauncher()
 launcher.setJobRepository(reportJob.getJobRepository())
 launcher.setTaskExecutor(taskExecutor)
 launcher.afterPropertiesSet()
 
 JobExecution jobExecution = launcher.run(reportJob.getJob(), reportJob.getJobParameters())
 if (jobExecution.getExitStatus() == ExitStatus.FAILED)
 // We should never reach this because we check for available threads above
 throw new Exception(jobExecution.ExitStatus.getExitDescription())
 }
 }

Restarting a Job

Here is the code snippet to restart a failed/stopped job using the job execution id.

import java.util.Map;
import java.util.concurrent.BlockingQueue
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import com.test.ApplicationContextHolder

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.configuration.JobRegistry
import org.springframework.batch.core.configuration.support.MapJobRegistry
import org.springframework.batch.core.configuration.support.ReferenceJobFactory
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.batch.core.launch.support.SimpleJobLauncher
import org.springframework.batch.core.launch.support.SimpleJobOperator
import org.springframework.batch.core.repository.JobRepository
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.batch.core.ExitStatus


class ReportJobLauncher {
 
 private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class)
 
 public static synchronized void restart(ReportJob reportJob, Long jobExecutionId) {
 assert reportJob

 try {
 ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName())
 if (taskExecutor.getThreadPoolExecutor().isShutdown())
 throw new TaskRejectedException("Task executor is shut down")

 if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName())){
 throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available")
 }

 JobExplorer jobExplorer = BatchReportConfig.getJobExplorer()
 JobRegistry jobRegistry = new MapJobRegistry()
 JobRepository jobRepository = BatchReportConfig.getJobRepository()
 JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId)
 if(jobExecution){

 jobRegistry.register(new ReferenceJobFactory(reportJob.getJob()))
 if(!(jobExecution.getStatus().equals(BatchStatus.COMPLETED))){

 SimpleJobOperator simpleJobOperator = new SimpleJobOperator()
 simpleJobOperator.setJobExplorer(jobExplorer)
 simpleJobOperator.setJobRegistry(jobRegistry)
 simpleJobOperator.setJobRepository(jobRepository)

 SimpleJobLauncher launcher = new SimpleJobLauncher()
 launcher.setJobRepository(jobRepository)
 launcher.setTaskExecutor(taskExecutor)
 launcher.afterPropertiesSet()

 simpleJobOperator.setJobLauncher(launcher)
 simpleJobOperator.restart(jobExecution.getId())
 }
 }
 } catch (Exception e) {
 throw new Exception(e)
 }
 }
 }

Stopping a Job

To stop a job effectively with minimal time and greater performance, set the chunk size/commit interval as small as possible. The chunk size/commit interval is defined and used in ‘ReportJob’ class to build a new report step function.

Note: When writing data to a CSV, Excel or other file systems, retaining smaller chunk size/commit interval value is a great option. However, if we are using a data source (database) for writing then this impacts the performance (as smaller the commit interval, higher the database interaction).

import java.util.Map;
import java.util.concurrent.BlockingQueue
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import com.test.ApplicationContextHolder

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.configuration.JobRegistry
import org.springframework.batch.core.configuration.support.MapJobRegistry
import org.springframework.batch.core.configuration.support.ReferenceJobFactory
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.batch.core.launch.support.SimpleJobLauncher
import org.springframework.batch.core.launch.support.SimpleJobOperator
import org.springframework.batch.core.repository.JobRepository
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.batch.core.ExitStatus


class ReportJobLauncher {
 
 private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class)
 
 public static synchronized void stop(ReportJob reportJob, Long jobExecutionId) {
 assert reportJob

 try {
 ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName())
 if (taskExecutor.getThreadPoolExecutor().isShutdown())
 throw new TaskRejectedException("Task executor is shut down")

 if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName())){
 throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available")
 }

 JobExplorer jobExplorer = BatchReportConfig.getJobExplorer()
 JobRegistry jobRegistry = new MapJobRegistry()
 JobRepository jobRepository = BatchReportConfig.getJobRepository()
 JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId)
 if(jobExecution){

 jobRegistry.register(new ReferenceJobFactory(reportJob.getJob()))
 if(!(jobExecution.getStatus().equals(BatchStatus.COMPLETED))){

 jobExecution.stop()

 SimpleJobOperator simpleJobOperator = new SimpleJobOperator()
 simpleJobOperator.setJobExplorer(jobExplorer)
 simpleJobOperator.setJobRegistry(jobRegistry)
 simpleJobOperator.setJobRepository(jobRepository)

 SimpleJobLauncher launcher = new SimpleJobLauncher()
 launcher.setJobRepository(jobRepository)
 launcher.setTaskExecutor(taskExecutor)
 launcher.afterPropertiesSet()

 simpleJobOperator.setJobLauncher(launcher)
 simpleJobOperator.stop(jobExecution.getId())
 }
 }
 } catch (Exception e) {
 throw new Exception(e)
 }
 }

}

Create a utility class to submit/restart/stop a job. To set up a job in a message queue use the below class. The message queue can pick a job and submit it to report job dispatcher to start/restart/dispatch a job.

import java.util.List;
import java.util.Map;

import com.test.batch.report.BatchReportConfig
import com.test.batch.report.BatchReportFileUtils
import com.test.security.SecurityUtils

import org.apache.commons.io.FilenameUtils;
import org.codehaus.groovy.grails.web.mapping.LinkGenerator
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.support.GenericMessage
import org.springframework.web.multipart.MultipartFile;

class IntegrationUtils {

 private IntegrationUtils() {}
 
 private static ReportJobRequest postReportJobRequest(String toolName, String email, Date submissionDate, List items, String jobRef = null, Map params = null, Long jobExecutionId = null, String jobRquestType = null) {
 assert toolName && email && submissionDate
 
 ReportJobRequest jobRequest = new ReportJobRequest()
 jobRequest.submissionTime = submissionDate.getTime()
 jobRequest.toolName = toolName
 jobRequest.toolDescr = BatchReportConfig.getToolDescr(toolName)
 jobRequest.userEmail = email
 jobRequest.jobRef = jobRef
 jobRequest.toolParams = params
 jobRequest.inputSource = items ? InputSource.LIST : InputSource.FILE
 jobRequest.inputList = items
 jobRequest.reportDelivery = ReportDelivery.LINK
 jobRequest.chunkSize = BatchReportConfig.getConfigObj().batch.report.chunkSize
 jobRequest.jobExecutionId = jobExecutionId
 jobRequest.jobRquestType = jobRquestType
 jobRequest.validate()
 
 MessageChannel msgChannel = BatchReportConfig.getMessageChannel()
 msgChannel.send(new GenericMessage<String>(jobRequest.toJson()))

 return jobRequest
 }
}

Summing Up

The primary goal of this blog post is to explain how ‘chunk oriented processing’ is integrated with the ‘Spring Integration’ module. Please feel free to drop in your comments and questions (if any) in the comments section. 

An Open Source Solutions Partnership with Evoke

We at Evoke Technologies bring more than a decade’s experience as an IT leader to the design and implementation of open source solutions for commercial enterprises. Our dedicated open source experts will understand your company’s most pressing challenges and guide you in developing an OSS plan to meet them. Whether e-commerce, CRM, content management or quality assurance, Evoke has open source expertise to benefit your business.

Contact Evoke Technologies at (937) 202-4161, and learn how we, as your open source solution provider, can start making your company’s software development and operations budget go farther today!

Sathish Jannarapu

View posts by Sathish Jannarapu
Sathish Jannarapu is working as a Senior Technical Associate at Evoke Technologies. He is currently focused on Java/J2EE based technologies including Spring, Hibernate and Web Services. In his spare time Sathish likes listening to music and reading articles on emerging technologies.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>