Skip to content

Commit 0561396

Browse files
committed
BATCH-1767: fix optimistic locking exception in multi-threaded step
Currently, when the commit of the chunk fails in a multi-threaded step, the step execution is rolled back to a previous, eventually obsolete version. This previous version might be obsolete because it could be modified by another thread. In this case, a OptimisticLockingFailureException is thrown when trying to persist the step execution leaving the step in an UNKNOWN state while it should be FAILED. This commit fixes the issue by refreshing the step execution to the latest correctly persisted version before applying the step contribution so that the contribution is applied on a fresh correct state. Resolves BATCH-1767
1 parent ea6c312 commit 0561396

File tree

3 files changed

+284
-10
lines changed

3 files changed

+284
-10
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2014 the original author or authors.
2+
* Copyright 2006-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import org.apache.commons.logging.LogFactory;
2020
import org.springframework.batch.core.BatchStatus;
2121
import org.springframework.batch.core.ChunkListener;
22+
import org.springframework.batch.core.JobExecution;
2223
import org.springframework.batch.core.JobInterruptedException;
2324
import org.springframework.batch.core.StepContribution;
2425
import org.springframework.batch.core.StepExecution;
@@ -72,6 +73,7 @@
7273
* @author Robert Kasanicky
7374
* @author Michael Minella
7475
* @author Will Schipp
76+
* @author Mahmoud Ben Hassine
7577
*/
7678
@SuppressWarnings("serial")
7779
public class TaskletStep extends AbstractStep {
@@ -338,6 +340,7 @@ private class ChunkTransactionCallback extends TransactionSynchronizationAdapter
338340
private boolean stepExecutionUpdated = false;
339341

340342
private StepExecution oldVersion;
343+
private ExecutionContext oldExecutionContext;
341344

342345
private boolean locked = false;
343346

@@ -359,6 +362,7 @@ public void afterCompletion(int status) {
359362
logger.info("Commit failed while step execution data was already updated. "
360363
+ "Reverting to old version.");
361364
copy(oldVersion, stepExecution);
365+
stepExecution.setExecutionContext(oldExecutionContext);
362366
if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
363367
rollback(stepExecution);
364368
}
@@ -370,7 +374,6 @@ public void afterCompletion(int status) {
370374
logger.error("Rolling back with transaction in unknown state");
371375
rollback(stepExecution);
372376
stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
373-
stepExecution.setTerminateOnly();
374377
}
375378
}
376379
finally {
@@ -396,8 +399,7 @@ public RepeatStatus doInTransaction(TransactionStatus status) {
396399

397400
// In case we need to push it back to its old value
398401
// after a commit fails...
399-
oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
400-
copy(stepExecution, oldVersion);
402+
oldExecutionContext = new ExecutionContext(stepExecution.getExecutionContext());
401403

402404
try {
403405

@@ -432,6 +434,23 @@ public RepeatStatus doInTransaction(TransactionStatus status) {
432434
Thread.currentThread().interrupt();
433435
}
434436

437+
// Refresh stepExecution to the latest correctly persisted
438+
// state in order to apply the contribution on the latest version
439+
String stepName = stepExecution.getStepName();
440+
JobExecution jobExecution = stepExecution.getJobExecution();
441+
StepExecution lastStepExecution = getJobRepository()
442+
.getLastStepExecution(jobExecution.getJobInstance(), stepName);
443+
if (lastStepExecution != null &&
444+
!lastStepExecution.getVersion().equals(stepExecution.getVersion())) {
445+
copy(lastStepExecution, stepExecution);
446+
}
447+
448+
// Take a copy of the stepExecution in case we need to
449+
// undo the current contribution to the in memory instance
450+
// if the commit fails
451+
oldVersion = new StepExecution(stepName, jobExecution);
452+
copy(stepExecution, oldVersion);
453+
435454
// Apply the contribution to the step
436455
// even if unsuccessful
437456
if (logger.isDebugEnabled()) {
@@ -498,11 +517,9 @@ private void rollback(StepExecution stepExecution) {
498517
}
499518

500519
private void copy(final StepExecution source, final StepExecution target) {
501-
target.setVersion(source.getVersion());
502520
target.setWriteCount(source.getWriteCount());
503521
target.setFilterCount(source.getFilterCount());
504522
target.setCommitCount(source.getCommitCount());
505-
target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
506523
}
507524

508525
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.step.item;
17+
18+
import org.junit.Test;
19+
import org.springframework.batch.core.BatchStatus;
20+
import org.springframework.batch.core.Job;
21+
import org.springframework.batch.core.JobExecution;
22+
import org.springframework.batch.core.JobParameters;
23+
import org.springframework.batch.core.StepExecution;
24+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
25+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
26+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
27+
import org.springframework.batch.core.launch.JobLauncher;
28+
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
29+
import org.springframework.batch.core.step.tasklet.TaskletStep;
30+
import org.springframework.batch.item.ItemReader;
31+
import org.springframework.batch.item.ItemWriter;
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.context.ApplicationContext;
34+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.context.annotation.Import;
38+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
39+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
40+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
41+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
42+
import org.springframework.transaction.PlatformTransactionManager;
43+
import org.springframework.transaction.support.DefaultTransactionStatus;
44+
45+
import javax.sql.DataSource;
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
48+
import static org.junit.Assert.assertEquals;
49+
import static org.junit.Assert.assertNotNull;
50+
51+
/**
52+
* Tests for the behavior of a multi-threaded TaskletStep.
53+
*
54+
* @author Mahmoud Ben Hassine
55+
*/
56+
public class MultiThreadedTaskletStepIntegrationTests {
57+
58+
@Test
59+
public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception {
60+
// given
61+
Class[] configurationClasses = {JobConfiguration.class, TransactionManagerConfiguration.class};
62+
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
63+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
64+
Job job = context.getBean(Job.class);
65+
66+
// when
67+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
68+
69+
// then
70+
assertNotNull(jobExecution);
71+
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
72+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
73+
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
74+
assertEquals(0, stepExecution.getFailureExceptions().size());
75+
}
76+
77+
@Test
78+
public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception {
79+
// given
80+
Class[] configurationClasses = {JobConfiguration.class, CommitFailingTransactionManagerConfiguration.class};
81+
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
82+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
83+
Job job = context.getBean(Job.class);
84+
85+
// when
86+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
87+
88+
// then
89+
assertNotNull(jobExecution);
90+
assertEquals(BatchStatus.FAILED, jobExecution.getStatus());
91+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
92+
assertEquals(BatchStatus.FAILED, stepExecution.getStatus());
93+
Throwable e = stepExecution.getFailureExceptions().get(0);
94+
assertEquals("Planned commit exception!", e.getMessage());
95+
// No assertions on execution context because it is undefined in this case
96+
}
97+
98+
@Test
99+
public void testMultiThreadedTaskletExecutionWhenRollbackFails() throws Exception {
100+
// given
101+
Class[] configurationClasses = {JobConfiguration.class, RollbackFailingTransactionManagerConfiguration.class};
102+
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
103+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
104+
Job job = context.getBean(Job.class);
105+
106+
// when
107+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
108+
109+
// then
110+
assertNotNull(jobExecution);
111+
assertEquals(BatchStatus.UNKNOWN, jobExecution.getStatus());
112+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
113+
assertEquals(BatchStatus.UNKNOWN, stepExecution.getStatus());
114+
Throwable e = stepExecution.getFailureExceptions().get(0);
115+
assertEquals("Planned rollback exception!", e.getMessage());
116+
// No assertions on execution context because it is undefined in this case
117+
}
118+
119+
@Configuration
120+
@EnableBatchProcessing
121+
public static class JobConfiguration {
122+
123+
@Autowired
124+
private JobBuilderFactory jobBuilderFactory;
125+
@Autowired
126+
private StepBuilderFactory stepBuilderFactory;
127+
128+
@Bean
129+
public TaskletStep step() {
130+
return stepBuilderFactory.get("step")
131+
.<Integer, Integer>chunk(3)
132+
.reader(itemReader())
133+
.writer(itemWriter())
134+
.taskExecutor(taskExecutor())
135+
.build();
136+
}
137+
138+
@Bean
139+
public Job job(ThreadPoolTaskExecutor taskExecutor) {
140+
return jobBuilderFactory.get("job")
141+
.start(step())
142+
.listener(new JobExecutionListenerSupport() {
143+
@Override
144+
public void afterJob(JobExecution jobExecution) {
145+
taskExecutor.shutdown();
146+
}
147+
})
148+
.build();
149+
}
150+
151+
@Bean
152+
public ThreadPoolTaskExecutor taskExecutor() {
153+
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
154+
taskExecutor.setCorePoolSize(3);
155+
taskExecutor.setMaxPoolSize(3);
156+
taskExecutor.setThreadNamePrefix("spring-batch-worker-thread-");
157+
return taskExecutor;
158+
}
159+
160+
@Bean
161+
public ItemReader<Integer> itemReader() {
162+
return new ItemReader<Integer>() {
163+
private AtomicInteger atomicInteger = new AtomicInteger();
164+
165+
@Override
166+
public synchronized Integer read() {
167+
int value = atomicInteger.incrementAndGet();
168+
return value <= 9 ? value : null;
169+
}
170+
};
171+
}
172+
173+
@Bean
174+
public ItemWriter<Integer> itemWriter() {
175+
return items -> {
176+
};
177+
}
178+
}
179+
180+
@Configuration
181+
public static class DataSourceConfiguration {
182+
183+
@Bean
184+
public DataSource dataSource() {
185+
return new EmbeddedDatabaseBuilder()
186+
.setType(EmbeddedDatabaseType.HSQL)
187+
.addScript("org/springframework/batch/core/schema-drop-hsqldb.sql")
188+
.addScript("org/springframework/batch/core/schema-hsqldb.sql")
189+
.build();
190+
}
191+
192+
}
193+
194+
@Configuration
195+
@Import(DataSourceConfiguration.class)
196+
public static class TransactionManagerConfiguration {
197+
198+
@Bean
199+
public PlatformTransactionManager transactionManager(DataSource dataSource) {
200+
return new DataSourceTransactionManager(dataSource);
201+
}
202+
203+
}
204+
205+
@Configuration
206+
@Import(DataSourceConfiguration.class)
207+
public static class CommitFailingTransactionManagerConfiguration {
208+
209+
@Bean
210+
public PlatformTransactionManager transactionManager(DataSource dataSource) {
211+
return new DataSourceTransactionManager(dataSource) {
212+
@Override
213+
protected void doCommit(DefaultTransactionStatus status) {
214+
super.doCommit(status);
215+
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
216+
throw new RuntimeException("Planned commit exception!");
217+
}
218+
}
219+
};
220+
}
221+
222+
}
223+
224+
@Configuration
225+
@Import(DataSourceConfiguration.class)
226+
public static class RollbackFailingTransactionManagerConfiguration {
227+
228+
@Bean
229+
public PlatformTransactionManager transactionManager(DataSource dataSource) {
230+
return new DataSourceTransactionManager(dataSource) {
231+
@Override
232+
protected void doCommit(DefaultTransactionStatus status) {
233+
super.doCommit(status);
234+
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
235+
throw new RuntimeException("Planned commit exception!");
236+
}
237+
}
238+
239+
@Override
240+
protected void doRollback(DefaultTransactionStatus status) {
241+
super.doRollback(status);
242+
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
243+
throw new RuntimeException("Planned rollback exception!");
244+
}
245+
}
246+
};
247+
}
248+
249+
}
250+
251+
}

spring-batch-docs/asciidoc/scalability.adoc

+10-4
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ your step, such as a `DataSource`. Be sure to make the pool in those resources
123123
as large as the desired number of concurrent threads in the step.
124124

125125
There are some practical limitations of using multi-threaded `Step` implementations for
126-
some common batch use cases. Many participants in a `Step` (such as readers and writers)
126+
some common batch use cases:
127+
128+
* Many participants in a `Step` (such as readers and writers)
127129
are stateful. If the state is not segregated by thread, then those components are not
128130
usable in a multi-threaded `Step`. In particular, most of the off-the-shelf readers and
129131
writers from Spring Batch are not designed for multi-threaded use. It is, however,
@@ -132,9 +134,8 @@ possible to work with stateless or thread safe readers and writers, and there is
132134
https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples[Spring
133135
Batch Samples] that shows the use of a process indicator (see
134136
<<readersAndWriters.adoc#process-indicator,Preventing State Persistence>>) to keep track
135-
of items that have been processed in a database input table.
136-
137-
Spring Batch provides some implementations of `ItemWriter` and `ItemReader`. Usually,
137+
of items that have been processed in a database input table. Spring Batch provides some
138+
implementations of `ItemWriter` and `ItemReader`. Usually,
138139
they say in the Javadoc if they are thread safe or not or what you have to do to avoid
139140
problems in a concurrent environment. If there is no information in the Javadoc, you can
140141
check the implementation to see if there is any state. If a reader is not thread safe,
@@ -143,6 +144,11 @@ synchronizing delegator. You can synchronize the call to `read()` and as long as
143144
processing and writing is the most expensive part of the chunk, your step may still
144145
complete much faster than it would in a single threaded configuration.
145146

147+
* In a multi-threaded `Step`, each thread runs in its own transaction and the `ChunkContext`
148+
is shared between threads. This shared state might end up in an inconsistent state
149+
if one of the transactions is rolled back. Hence, we recommend avoiding `ExecutionContext`
150+
manipulation in a multi-threaded `Step`.
151+
146152
[[scalabilityParallelSteps]]
147153

148154

0 commit comments

Comments
 (0)