Skip to content

Commit 3e327eb

Browse files
committed
Introduce ChunkTaskExecutorItemWriter to support local chunking
Resolves #5021
1 parent 52875e7 commit 3e327eb

File tree

11 files changed

+340
-12
lines changed

11 files changed

+340
-12
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,31 @@
1616

1717
package org.springframework.batch.core.step.item;
1818

19-
import org.jspecify.annotations.NullUnmarked;
20-
2119
import org.springframework.batch.core.step.StepContribution;
2220
import org.springframework.batch.infrastructure.item.Chunk;
2321

2422
/**
2523
* Interface defined for processing {@link Chunk}s.
2624
*
25+
* @author Mahmoud Ben Hassine
2726
* @author Kyeonghoon Lee (Add FunctionalInterface annotation)
2827
* @since 2.0
29-
* @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
3028
*/
31-
@NullUnmarked
32-
@Deprecated(since = "6.0", forRemoval = true)
3329
@FunctionalInterface
3430
public interface ChunkProcessor<I> {
3531

36-
void process(StepContribution contribution, Chunk<I> chunk) throws Exception;
32+
@Deprecated(since = "6.0", forRemoval = true)
33+
default void process(StepContribution contribution, Chunk<I> chunk) throws Exception {
34+
process(chunk, contribution);
35+
}
36+
37+
/**
38+
* Process the given chunk and update the contribution.
39+
* @param chunk the chunk to process
40+
* @param contribution the current step contribution
41+
* @throws Exception if there is any error during processing
42+
* @since 6.0
43+
*/
44+
void process(Chunk<I> chunk, StepContribution contribution) throws Exception;
3745

3846
}

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ protected void writeItems(Chunk<O> items) throws Exception {
212212
}
213213

214214
@Override
215-
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
215+
public final void process(Chunk<I> inputs, StepContribution contribution) throws Exception {
216216

217217
// Allow temporary state to be stored in the user data field
218218
initializeUserData(inputs);

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2023 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,7 +52,7 @@ public Chunk<String> provide(StepContribution contribution) throws Exception {
5252
@Override
5353
public void postProcess(StepContribution contribution, Chunk<String> chunk) {
5454
}
55-
}, (contribution, chunk) -> contribution.incrementWriteCount(1));
55+
}, (chunk, contribution) -> contribution.incrementWriteCount(1));
5656
StepContribution contribution = new StepContribution(
5757
new StepExecution("foo", new JobExecution(1L, new JobInstance(123L, "job"), new JobParameters())));
5858
handler.execute(contribution, context);
@@ -95,7 +95,7 @@ public Chunk<String> provide(StepContribution contribution) throws Exception {
9595
@Override
9696
public void postProcess(StepContribution contribution, Chunk<String> chunk) {
9797
}
98-
}, (contribution, chunk) -> contribution.incrementWriteCount(1));
98+
}, (chunk, contribution) -> contribution.incrementWriteCount(1));
9999
StepContribution contribution = new StepContribution(
100100
new StepExecution("foo", new JobExecution(1L, new JobInstance(123L, "job"), new JobParameters())));
101101
ExitStatus expected = contribution.getExitStatus();
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.integration.chunk;
17+
18+
import org.springframework.batch.core.ExitStatus;
19+
import org.springframework.batch.core.step.StepContribution;
20+
import org.springframework.batch.core.step.StepExecution;
21+
import org.springframework.batch.core.listener.StepExecutionListener;
22+
import org.springframework.batch.core.step.item.ChunkProcessor;
23+
import org.springframework.batch.infrastructure.item.Chunk;
24+
import org.springframework.batch.infrastructure.item.ItemWriter;
25+
import org.springframework.core.task.TaskExecutor;
26+
27+
import java.util.ArrayList;
28+
import java.util.Collection;
29+
import java.util.HashSet;
30+
import java.util.List;
31+
import java.util.Set;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.Future;
34+
import java.util.concurrent.FutureTask;
35+
36+
/**
37+
* Similar to {@code ChunkMessageChannelItemWriter}, this item writer submits chunk
38+
* requests to local workers from a {@link TaskExecutor} instead of sending them over a
39+
* message channel to remote workers.
40+
*
41+
* @param <T> type of items
42+
* @see ChunkMessageChannelItemWriter
43+
* @author Mahmoud Ben Hassine
44+
* @since 6.0
45+
*/
46+
public class ChunkTaskExecutorItemWriter<T> implements ItemWriter<T>, StepExecutionListener {
47+
48+
@SuppressWarnings("NullAway.Init")
49+
private StepExecution stepExecution;
50+
51+
private int sequence;
52+
53+
private final TaskExecutor taskExecutor;
54+
55+
private final ChunkProcessorChunkRequestHandler<T> chunkProcessorChunkHandler = new ChunkProcessorChunkRequestHandler<>();
56+
57+
private final Set<Future<ChunkResponse>> responses = new HashSet<>();
58+
59+
/**
60+
* Create a new {@link ChunkTaskExecutorItemWriter}.
61+
* @param chunkRequestProcessor the chunk processor to process chunks
62+
* @param taskExecutor the task executor to submit chunk processing tasks to
63+
*/
64+
public ChunkTaskExecutorItemWriter(ChunkProcessor<T> chunkRequestProcessor, TaskExecutor taskExecutor) {
65+
this.taskExecutor = taskExecutor;
66+
this.chunkProcessorChunkHandler.setChunkProcessor(chunkRequestProcessor);
67+
}
68+
69+
@Override
70+
public void write(Chunk<? extends T> chunk) {
71+
ChunkRequest<T> request = new ChunkRequest<>(++sequence, chunk, this.stepExecution.getJobExecutionId(),
72+
this.stepExecution.createStepContribution());
73+
FutureTask<ChunkResponse> chunkResponseFutureTask = new FutureTask<>(
74+
() -> this.chunkProcessorChunkHandler.handle(request));
75+
this.responses.add(chunkResponseFutureTask);
76+
this.taskExecutor.execute(chunkResponseFutureTask);
77+
}
78+
79+
@Override
80+
public void beforeStep(StepExecution stepExecution) {
81+
this.stepExecution = stepExecution;
82+
}
83+
84+
@Override
85+
public ExitStatus afterStep(StepExecution stepExecution) {
86+
try {
87+
for (StepContribution contribution : getStepContributions()) {
88+
stepExecution.apply(contribution);
89+
}
90+
}
91+
catch (ExecutionException | InterruptedException e) {
92+
return ExitStatus.FAILED.addExitDescription(e);
93+
}
94+
return ExitStatus.COMPLETED.addExitDescription("Waited for " + this.responses.size() + " results.");
95+
}
96+
97+
private Collection<StepContribution> getStepContributions() throws ExecutionException, InterruptedException {
98+
List<StepContribution> contributions = new ArrayList<>();
99+
for (Future<ChunkResponse> task : this.responses) {
100+
contributions.add(task.get().getStepContribution());
101+
}
102+
return contributions;
103+
}
104+
105+
}

spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkRequestHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class ChunkProcessorChunkRequestHandlerTests {
3333
@Test
3434
void testVanillaHandleChunk() throws Exception {
3535
// given
36-
handler.setChunkProcessor((contribution, chunk) -> count += chunk.size());
36+
handler.setChunkProcessor((chunk, contribution) -> count += chunk.size());
3737
StepContribution stepContribution = MetaDataInstanceFactory.createStepExecution().createStepContribution();
3838
Chunk items = Chunk.of("foo", "bar");
3939
ChunkRequest chunkRequest = new ChunkRequest<>(0, items, 12L, stepContribution);
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.samples.chunking.local;
17+
18+
import javax.sql.DataSource;
19+
20+
import org.springframework.batch.core.ExitStatus;
21+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
22+
import org.springframework.batch.core.configuration.annotation.StepScope;
23+
import org.springframework.batch.core.job.Job;
24+
import org.springframework.batch.core.job.builder.JobBuilder;
25+
import org.springframework.batch.core.repository.JobRepository;
26+
import org.springframework.batch.core.step.Step;
27+
import org.springframework.batch.core.step.builder.StepBuilder;
28+
import org.springframework.batch.core.step.item.ChunkProcessor;
29+
import org.springframework.batch.infrastructure.item.database.JdbcBatchItemWriter;
30+
import org.springframework.batch.infrastructure.item.database.builder.JdbcBatchItemWriterBuilder;
31+
import org.springframework.batch.infrastructure.item.file.FlatFileItemReader;
32+
import org.springframework.batch.infrastructure.item.file.builder.FlatFileItemReaderBuilder;
33+
import org.springframework.batch.integration.chunk.ChunkTaskExecutorItemWriter;
34+
import org.springframework.batch.samples.common.DataSourceConfiguration;
35+
import org.springframework.beans.factory.annotation.Value;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.context.annotation.Import;
39+
import org.springframework.core.io.Resource;
40+
import org.springframework.jdbc.support.JdbcTransactionManager;
41+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
42+
import org.springframework.transaction.support.TransactionTemplate;
43+
44+
@Configuration
45+
@EnableBatchProcessing
46+
@Import(DataSourceConfiguration.class)
47+
public class LocalChunkingJobConfiguration {
48+
49+
public record Vet(String firstname, String lastname) {
50+
}
51+
52+
@Bean
53+
public Job job(JobRepository jobRepository, Step chunkingStep) {
54+
return new JobBuilder("job", jobRepository).start(chunkingStep).build();
55+
}
56+
57+
@Bean
58+
public Step chunkingStep(JobRepository jobRepository, JdbcTransactionManager transactionManager,
59+
FlatFileItemReader<Vet> itemReader, ChunkTaskExecutorItemWriter<Vet> itemWriter) {
60+
return new StepBuilder("chunkingStep", jobRepository).<Vet, Vet>chunk(2)
61+
.transactionManager(transactionManager)
62+
.reader(itemReader)
63+
.writer(itemWriter)
64+
.build();
65+
}
66+
67+
@Bean
68+
@StepScope
69+
public FlatFileItemReader<Vet> itemReader(@Value("#{jobParameters['inputFile']}") Resource resource) {
70+
return new FlatFileItemReaderBuilder<Vet>().name("vetItemReader")
71+
.resource(resource)
72+
.delimited()
73+
.names("firstname", "lastname")
74+
.targetType(Vet.class)
75+
.build();
76+
}
77+
78+
@Bean
79+
public ChunkTaskExecutorItemWriter<Vet> itemWriter(ChunkProcessor<Vet> chunkProcessor) {
80+
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
81+
taskExecutor.setCorePoolSize(4);
82+
taskExecutor.setThreadNamePrefix("worker-thread-");
83+
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
84+
taskExecutor.afterPropertiesSet();
85+
return new ChunkTaskExecutorItemWriter<>(chunkProcessor, taskExecutor);
86+
}
87+
88+
@Bean
89+
public ChunkProcessor<Vet> chunkProcessor(DataSource dataSource, TransactionTemplate transactionTemplate) {
90+
String sql = "insert into vets (firstname, lastname) values (?, ?)";
91+
JdbcBatchItemWriter<Vet> itemWriter = new JdbcBatchItemWriterBuilder<Vet>().dataSource(dataSource)
92+
.sql(sql)
93+
.itemPreparedStatementSetter((item, ps) -> {
94+
ps.setString(1, item.firstname());
95+
ps.setString(2, item.lastname());
96+
})
97+
.build();
98+
99+
return (chunk, contribution) -> transactionTemplate.executeWithoutResult(transactionStatus -> {
100+
try {
101+
itemWriter.write(chunk);
102+
contribution.incrementWriteCount(chunk.size());
103+
contribution.setExitStatus(ExitStatus.COMPLETED);
104+
}
105+
catch (Exception e) {
106+
transactionStatus.setRollbackOnly();
107+
contribution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
108+
}
109+
});
110+
}
111+
112+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
## Local Chunking Sample
2+
3+
### About
4+
5+
This sample shows how to use the `ChunkTaskExecutorItemWriter` to write items in a
6+
multi-threaded way. The example uses a `ThreadPoolTaskExecutor` to write items
7+
in parallel with multiple threads, each one writing a chunk of items in its own transaction.
8+
9+
In this sample, data items of type `Vet` are read from a flat file and written to
10+
a relational database table. The `Vet` class is a simple domain object representing a
11+
veterinarian, with properties `firstName`, and `lastName`. The flat file contains records
12+
of veterinarians, and each record is mapped to a `Vet` object using a `FlatFileItemReader`.
13+
The `JdbcBatchItemWriter` is then used to persist these `Vet` objects into a relational database
14+
table named `vets`. The table has columns corresponding to the properties of the `Vet` class.
15+
16+
### Run the sample
17+
18+
You can run the sample from the command line as following:
19+
20+
```
21+
$>cd spring-batch-samples
22+
$>../mvnw -Dtest=LocalChunkingJobFunctionalTests#testLaunchJobWithJavaConfig test
23+
```

spring-batch-samples/src/main/java/org/springframework/batch/samples/common/DataSourceConfiguration.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,8 +20,10 @@
2020

2121
import org.springframework.context.annotation.Bean;
2222
import org.springframework.context.annotation.Configuration;
23+
import org.springframework.jdbc.core.JdbcTemplate;
2324
import org.springframework.jdbc.support.JdbcTransactionManager;
2425
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
26+
import org.springframework.transaction.support.TransactionTemplate;
2527

2628
/**
2729
* @author Mahmoud Ben Hassine
@@ -38,9 +40,19 @@ public DataSource dataSource() {
3840
.build();
3941
}
4042

43+
@Bean
44+
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
45+
return new JdbcTemplate(dataSource);
46+
}
47+
4148
@Bean
4249
public JdbcTransactionManager transactionManager(DataSource dataSource) {
4350
return new JdbcTransactionManager(dataSource);
4451
}
4552

53+
@Bean
54+
public TransactionTemplate transactionTemplate(JdbcTransactionManager transactionManager) {
55+
return new TransactionTemplate(transactionManager);
56+
}
57+
4658
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
foo1,bar1
2+
foo2,bar2
3+
foo3,bar3
4+
foo4,bar4
5+
foo5,bar5
6+
foo6,bar6

spring-batch-samples/src/main/resources/org/springframework/batch/samples/common/business-schema-hsqldb.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ DROP TABLE GAMES IF EXISTS;
1010
DROP TABLE PLAYER_SUMMARY IF EXISTS;
1111
DROP TABLE ERROR_LOG IF EXISTS;
1212
DROP TABLE OWNERS IF EXISTS;
13+
DROP TABLE VETS IF EXISTS;
1314

1415
-- Autogenerated: do not edit this file
1516

@@ -104,6 +105,12 @@ CREATE TABLE ERROR_LOG (
104105

105106
-- PetClinic sample tables
106107

108+
CREATE TABLE VETS (
109+
ID INTEGER IDENTITY PRIMARY KEY,
110+
FIRSTNAME VARCHAR(30),
111+
LASTNAME VARCHAR(30)
112+
);
113+
107114
CREATE TABLE OWNERS (
108115
ID INTEGER IDENTITY PRIMARY KEY,
109116
FIRSTNAME VARCHAR(30),

0 commit comments

Comments
 (0)