Skip to content

Allow customization of Spring Batch TaskExecutor #29296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.boot.autoconfigure.batch.BatchProperties.Isolation;
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

Expand All @@ -39,6 +40,7 @@
* @author Andy Wilkinson
* @author Kazuki Shimizu
* @author Stephane Nicoll
* @author Andreas Ahlenstorf
* @since 1.0.0
*/
public class BasicBatchConfigurer implements BatchConfigurer, InitializingBean {
Expand All @@ -51,6 +53,8 @@ public class BasicBatchConfigurer implements BatchConfigurer, InitializingBean {

private final TransactionManagerCustomizers transactionManagerCustomizers;

private final TaskExecutor taskExecutor;

private JobRepository jobRepository;

private JobLauncher jobLauncher;
Expand All @@ -63,12 +67,30 @@ public class BasicBatchConfigurer implements BatchConfigurer, InitializingBean {
* @param dataSource the underlying data source
* @param transactionManagerCustomizers transaction manager customizers (or
* {@code null})
* @deprecated since 2.7.0 for removal in 3.0.0 in favor of
* {@link #BasicBatchConfigurer(BatchProperties, DataSource, TransactionManagerCustomizers, TaskExecutor)}
*/
@Deprecated
protected BasicBatchConfigurer(BatchProperties properties, DataSource dataSource,
TransactionManagerCustomizers transactionManagerCustomizers) {
this(properties, dataSource, transactionManagerCustomizers, null);
}

/**
* Create a new {@link BasicBatchConfigurer} instance.
* @param properties the batch properties
* @param dataSource the underlying data source
* @param transactionManagerCustomizers transaction manager customizers (or
* {@code null})
* @param taskExecutor the executor to be used by {@link JobLauncher} (or
* {@code null})
*/
protected BasicBatchConfigurer(BatchProperties properties, DataSource dataSource,
TransactionManagerCustomizers transactionManagerCustomizers, TaskExecutor taskExecutor) {
this.properties = properties;
this.dataSource = dataSource;
this.transactionManagerCustomizers = transactionManagerCustomizers;
this.taskExecutor = taskExecutor;
}

@Override
Expand Down Expand Up @@ -120,6 +142,9 @@ protected JobExplorer createJobExplorer() throws Exception {
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
if (this.taskExecutor != null) {
jobLauncher.setTaskExecutor(this.taskExecutor);
}
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,12 +27,14 @@
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

/**
* Provide a {@link BatchConfigurer} according to the current environment.
*
* @author Stephane Nicoll
* @author Andreas Ahlenstorf
*/
@ConditionalOnClass(PlatformTransactionManager.class)
@ConditionalOnBean(DataSource.class)
Expand All @@ -47,9 +49,10 @@ static class JdbcBatchConfiguration {
@Bean
BasicBatchConfigurer batchConfigurer(BatchProperties properties, DataSource dataSource,
@BatchDataSource ObjectProvider<DataSource> batchDataSource,
ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers,
@BatchTaskExecutor ObjectProvider<TaskExecutor> batchTaskExecutor) {
return new BasicBatchConfigurer(properties, batchDataSource.getIfAvailable(() -> dataSource),
transactionManagerCustomizers.getIfAvailable());
transactionManagerCustomizers.getIfAvailable(), batchTaskExecutor.getIfAvailable());
}

}
Expand All @@ -63,9 +66,11 @@ static class JpaBatchConfiguration {
JpaBatchConfigurer batchConfigurer(BatchProperties properties, DataSource dataSource,
@BatchDataSource ObjectProvider<DataSource> batchDataSource,
ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers,
EntityManagerFactory entityManagerFactory) {
EntityManagerFactory entityManagerFactory,
@BatchTaskExecutor ObjectProvider<TaskExecutor> batchTaskExecutor) {
return new JpaBatchConfigurer(properties, batchDataSource.getIfAvailable(() -> dataSource),
transactionManagerCustomizers.getIfAvailable(), entityManagerFactory);
transactionManagerCustomizers.getIfAvailable(), entityManagerFactory,
batchTaskExecutor.getIfAvailable());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.batch;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.beans.factory.annotation.Qualifier;

/**
* Qualifier annotation for a TaskExecutor to be injected into Batch auto-configuration.
*
* @author Andreas Ahlenstorf
* @since 2.7.0
*/
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Qualifier
public @interface BatchTaskExecutor {

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

import org.springframework.boot.autoconfigure.batch.BatchProperties.Isolation;
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.core.task.TaskExecutor;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

/**
* A {@link BasicBatchConfigurer} tailored for JPA.
*
* @author Stephane Nicoll
* @author Andreas Ahlenstorf
* @since 2.0.0
*/
public class JpaBatchConfigurer extends BasicBatchConfigurer {
Expand All @@ -46,10 +48,30 @@ public class JpaBatchConfigurer extends BasicBatchConfigurer {
* @param transactionManagerCustomizers transaction manager customizers (or
* {@code null})
* @param entityManagerFactory the entity manager factory (or {@code null})
* @deprecated since 2.7.0 for removal in 3.0.0 in favor of
* {@link #JpaBatchConfigurer(BatchProperties, DataSource, TransactionManagerCustomizers, EntityManagerFactory, TaskExecutor)}
*/
@Deprecated
protected JpaBatchConfigurer(BatchProperties properties, DataSource dataSource,
TransactionManagerCustomizers transactionManagerCustomizers, EntityManagerFactory entityManagerFactory) {
super(properties, dataSource, transactionManagerCustomizers);
super(properties, dataSource, transactionManagerCustomizers, null);
this.entityManagerFactory = entityManagerFactory;
}

/**
* Create a new {@link BasicBatchConfigurer} instance.
* @param properties the batch properties
* @param dataSource the underlying data source
* @param transactionManagerCustomizers transaction manager customizers (or
* {@code null})
* @param entityManagerFactory the entity manager factory (or {@code null})
* @param taskExecutor the executor to be used by
* {@link org.springframework.batch.core.launch.JobLauncher} (or {@code null})
*/
protected JpaBatchConfigurer(BatchProperties properties, DataSource dataSource,
TransactionManagerCustomizers transactionManagerCustomizers, EntityManagerFactory entityManagerFactory,
TaskExecutor taskExecutor) {
super(properties, dataSource, transactionManagerCustomizers, taskExecutor);
this.entityManagerFactory = entityManagerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
Expand All @@ -80,6 +82,7 @@
* @author Stephane Nicoll
* @author Vedran Pavic
* @author Kazuki Shimizu
* @author Andreas Ahlenstorf
*/
@ExtendWith(OutputCaptureExtension.class)
class BatchAutoConfigurationTests {
Expand Down Expand Up @@ -367,6 +370,37 @@ void whenTheUserDefinesTheirOwnDatabaseInitializerThenTheAutoConfiguredBatchInit
.hasBean("customInitializer"));
}

@Test
void jobLauncherUsesBatchTaskExecutorIfPresent() {
this.contextRunner.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class,
BatchTaskExecutorConfiguration.class).run((context) -> {
assertThat(context).hasBean("batchTaskExecutor");

TaskExecutor taskExecutor = context.getBean("batchTaskExecutor", TaskExecutor.class);
BatchConfigurer batchConfigurer = context.getBean(BatchConfigurer.class);

assertThat(batchConfigurer.getJobLauncher()).hasFieldOrPropertyWithValue("taskExecutor",
taskExecutor);
});
}

@Test
void jobLauncherConfiguredByJpaBatchConfigurerUsesBatchTaskExecutorIfPresent() {
this.contextRunner.withUserConfiguration(TestConfiguration.class, EmbeddedDataSourceConfiguration.class,
HibernateJpaAutoConfiguration.class, BatchTaskExecutorConfiguration.class).run((context) -> {
PlatformTransactionManager transactionManager = context.getBean(PlatformTransactionManager.class);
assertThat(transactionManager.toString().contains("JpaTransactionManager")).isTrue();

assertThat(context).hasBean("batchTaskExecutor");

TaskExecutor taskExecutor = context.getBean("batchTaskExecutor", TaskExecutor.class);
BatchConfigurer batchConfigurer = context.getBean(BatchConfigurer.class);

assertThat(batchConfigurer.getJobLauncher()).hasFieldOrPropertyWithValue("taskExecutor",
taskExecutor);
});
}

@Configuration(proxyBeanMethods = false)
protected static class BatchDataSourceConfiguration {

Expand Down Expand Up @@ -531,4 +565,15 @@ DataSourceScriptDatabaseInitializer customInitializer(DataSource dataSource) {

}

@Configuration(proxyBeanMethods = false)
protected static class BatchTaskExecutorConfiguration {

@BatchTaskExecutor
@Bean
public TaskExecutor batchTaskExecutor() {
return new SyncTaskExecutor();
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -223,7 +223,7 @@ static class BatchConfiguration extends BasicBatchConfigurer {
private final DataSource dataSource;

protected BatchConfiguration(DataSource dataSource) {
super(new BatchProperties(), dataSource, new TransactionManagerCustomizers(null));
super(new BatchProperties(), dataSource, new TransactionManagerCustomizers(null), null);
this.dataSource = dataSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
A number of questions often arise when people use Spring Batch from within a Spring Boot application.
This section addresses those questions.

[[howto.batch.architectural-considerations]]
=== Architectural Considerations

The default configuration of Spring Batch is geared towards single-job containerized Spring Boot apps orchestrated by a job scheduler like Kubernetes, optionally using https://dataflow.spring.io/[Spring Cloud Dataflow]:

* All `Jobs` in the application context are executed once on application startup; see <<#howto.batch.running-jobs-on-startup>>.
* Jobs are run on the same thread they are launched on.

However, it is also a valid option to run batch jobs as part of other applications, notably web applications running in servlet containers.
Example use cases include reporting, ad-hoc job running, and web application support.
See {spring-batch-docs}job.html#runningJobsFromWebContainer[the Spring Batch documentation] for how to do this.
You might need to <<#howto.batch.customizing-task-executor,customize the task executor for batch jobs>> if you opt for this approach.



[[howto.batch.specifying-a-data-source]]
Expand Down Expand Up @@ -57,3 +70,20 @@ This provides only one argument to the batch job: `someParameter=someValue`.
Spring Batch requires a data store for the `Job` repository.
If you use Spring Boot, you must use an actual database.
Note that it can be an in-memory database, see {spring-batch-docs}job.html#configuringJobRepository[Configuring a Job Repository].

[[howto.batch.customizing-task-executor]]
=== Customizing the TaskExecutor for Batch Jobs

By default, jobs are run on the same thread they are launched on.
See {spring-batch-api}/core/configuration/annotation/EnableBatchProcessing.html[the Javadoc of `@EnableBatchProcessing`] for details.
Depending on your type of application, this might be undesirable because the starting thread is blocked while the batch job runs.
Furthermore, additional jobs might have to wait for the currently running job to complete.
In such a case, it is necessary to customize the `TaskExecutor` used by Spring Batch.
The auto-configuration of Spring Batch automatically picks up any `TaskExecutor` annotated with `@BatchTaskExecutor`, for example:

[source,java,indent=0,subs="verbatim"]
----
include::{docs-java}/howto/batch/customizingtaskexecutor/BatchConfiguration.java[tag=bean]
----

The example configures a `TaskExecutor` that dynamically adapts it size and uses up to 4 threads with a maximum queue size of 10. Adjust this configuration to match the needs of your application.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.docs.howto.batch.customizingtaskexecutor;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.autoconfigure.batch.BatchTaskExecutor;
import org.springframework.boot.task.TaskExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.TaskExecutor;

@EnableBatchProcessing
public class BatchConfiguration {

// tag::bean[]
@Bean
@BatchTaskExecutor
public TaskExecutor batchTaskExecutor(TaskExecutorBuilder builder) {
return builder.threadNamePrefix("batch-").corePoolSize(0).maxPoolSize(4).queueCapacity(10)
.allowCoreThreadTimeOut(true).awaitTermination(false).build();
}
// end::bean[]

}