Closed
Description
If throttleLimit
exceeds the ThreadPoolExecutor
pool size plus the jobQueue
capacity (and using the default RejectedExecutionHandler
), then the batch hangs when the job queue capacity in the executor is exceeded. Here is an example:
@Configuration
@EnableBatchProcessing
public class GH3947a {
private static final ConcurrentLinkedQueue<Integer> ITEMS = new ConcurrentLinkedQueue<Integer>(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()));
@Bean
ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(10);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("execution rejected");
super.rejectedExecution(r, e);
}
});
taskExecutor.setDaemon(true);
return taskExecutor;
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Integer,Integer>chunk(1)
.reader(() -> ITEMS.poll())
.processor(processor())
.writer(items -> System.out.println(items))
.taskExecutor(taskExecutor())
.throttleLimit(21)
.build())
.build();
}
private ItemProcessor<Integer, Integer> processor() {
return new ItemProcessor<Integer, Integer>() {
@Override
public Integer process(Integer item) throws Exception {
TimeUnit.SECONDS.sleep(1);
return item;
}
};
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(GH3947a.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
The RejectedExecutionException
is added to the deferred
list in RepeatTemplate.doHandle
and the batch hangs waiting for results.