Skip to content

Commit 98070e4

Browse files
committed
Polish and updated a test to validate the conditions
1 parent ee56a23 commit 98070e4

File tree

5 files changed

+149
-31
lines changed

5 files changed

+149
-31
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ protected ChunkProvider<I> createChunkProvider() {
434434

435435
SkipPolicy readSkipPolicy = createSkipPolicy();
436436
readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
437-
FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<I>(getReader(),
437+
FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<>(getReader(),
438438
createChunkOperations());
439439
chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
440440
chunkProvider.setSkipPolicy(readSkipPolicy);

spring-batch-docs/asciidoc/spring-batch-integration.adoc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -939,7 +939,7 @@ For more information, see the section of the "Scalability" chapter on
939939
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].
940940

941941
Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration`
942-
annotation that can be used to simplify remote chunking setup. This annotation provides
942+
annotation that can be used to simplify a remote chunking setup. This annotation provides
943943
two beans that can be autowired in the application context:
944944

945945
* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step
@@ -957,8 +957,8 @@ configure a master step by declaring:
957957
* the output channel ("Outgoing requests") to send requests to workers
958958
* the input channel ("Incoming replies") to receive replies from workers
959959

960-
There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter`
961-
and the `MessagingTemplate` (Those can still be explicitly configured if required).
960+
A `ChunkMessageChannelItemWriter` and the `MessagingTemplate` are not needed to be explicitly configured
961+
(Those can still be explicitly configured if required).
962962

963963
On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a worker to:
964964

@@ -967,14 +967,15 @@ On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a
967967
with the configured `ItemProcessor` and `ItemWriter`
968968
* send replies on the output channel ("Outgoing replies") to the master
969969

970-
There is no need anymore to explicitly configure the `SimpleChunkProcessor`
971-
and the `ChunkProcessorChunkHandler` (Those can still be explicitly configured if required).
970+
There is no need to explicitly configure the `SimpleChunkProcessor`
971+
and the `ChunkProcessorChunkHandler` (Those can be explicitly configured if required).
972972

973973
The following example shows how to use these APIs:
974974

975975
[source, java]
976976
----
977977
@EnableBatchIntegration
978+
@EnableBatchProcessing
978979
public class RemoteChunkingJobConfiguration {
979980
980981
@Configuration

spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.batch.core.step.item.KeyGenerator;
2727
import org.springframework.batch.core.step.skip.SkipPolicy;
2828
import org.springframework.batch.core.step.tasklet.TaskletStep;
29+
import org.springframework.batch.item.ItemProcessor;
2930
import org.springframework.batch.item.ItemReader;
3031
import org.springframework.batch.item.ItemStream;
3132
import org.springframework.batch.item.ItemWriter;
@@ -400,4 +401,10 @@ public RemoteChunkingMasterStepBuilder<I, O> allowStartIfComplete(boolean allowS
400401
super.allowStartIfComplete(allowStartIfComplete);
401402
return this;
402403
}
404+
405+
@Override
406+
public RemoteChunkingMasterStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> itemProcessor) {
407+
super.processor(itemProcessor);
408+
return this;
409+
}
403410
}

spring-batch-integration/src/main/java/org/springframework/batch/integration/config/annotation/EnableBatchIntegration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* <pre class="code">
4646
* &#064;Configuration
4747
* &#064;EnableBatchIntegration
48+
* &#064;EnableBatchProcessing
4849
* public class RemoteChunkingAppConfig {
4950
*
5051
* &#064;Autowired

spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilderTest.java

Lines changed: 134 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,57 @@
1515
*/
1616
package org.springframework.batch.integration.chunk;
1717

18+
import java.io.IOException;
1819
import java.util.Arrays;
20+
import java.util.List;
1921

2022
import org.junit.Assert;
2123
import org.junit.Rule;
2224
import org.junit.Test;
2325
import org.junit.rules.ExpectedException;
2426
import org.junit.runner.RunWith;
2527

28+
import org.springframework.batch.core.ChunkListener;
29+
import org.springframework.batch.core.ItemReadListener;
30+
import org.springframework.batch.core.ItemWriteListener;
31+
import org.springframework.batch.core.JobExecution;
32+
import org.springframework.batch.core.JobParameters;
33+
import org.springframework.batch.core.SkipListener;
34+
import org.springframework.batch.core.StepExecution;
35+
import org.springframework.batch.core.StepExecutionListener;
2636
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
27-
import org.springframework.batch.core.listener.ChunkListenerSupport;
28-
import org.springframework.batch.core.listener.CompositeItemReadListener;
29-
import org.springframework.batch.core.listener.CompositeItemWriteListener;
30-
import org.springframework.batch.core.listener.SkipListenerSupport;
31-
import org.springframework.batch.core.listener.StepExecutionListenerSupport;
3237
import org.springframework.batch.core.repository.JobRepository;
38+
import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
39+
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
40+
import org.springframework.batch.core.step.item.SimpleChunkProvider;
3341
import org.springframework.batch.core.step.tasklet.TaskletStep;
42+
import org.springframework.batch.item.ItemProcessor;
3443
import org.springframework.batch.item.ItemReader;
3544
import org.springframework.batch.item.ItemStreamSupport;
45+
import org.springframework.batch.item.ItemWriter;
46+
import org.springframework.batch.item.support.CompositeItemStream;
3647
import org.springframework.batch.item.support.ListItemReader;
37-
import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
3848
import org.springframework.batch.repeat.support.RepeatTemplate;
3949
import org.springframework.beans.factory.annotation.Autowired;
4050
import org.springframework.context.annotation.Configuration;
4151
import org.springframework.integration.channel.DirectChannel;
4252
import org.springframework.integration.channel.QueueChannel;
53+
import org.springframework.integration.core.MessagingTemplate;
4354
import org.springframework.messaging.PollableChannel;
55+
import org.springframework.retry.RetryListener;
4456
import org.springframework.retry.backoff.NoBackOffPolicy;
45-
import org.springframework.retry.listener.RetryListenerSupport;
4657
import org.springframework.retry.policy.MapRetryContextCache;
4758
import org.springframework.test.context.ContextConfiguration;
4859
import org.springframework.test.context.junit4.SpringRunner;
60+
import org.springframework.test.util.ReflectionTestUtils;
4961
import org.springframework.transaction.PlatformTransactionManager;
5062
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
5163

64+
import static org.mockito.Mockito.any;
65+
import static org.mockito.Mockito.atLeastOnce;
66+
import static org.mockito.Mockito.mock;
67+
import static org.mockito.Mockito.verify;
68+
import static org.mockito.Mockito.when;
5269
/**
5370
* @author Mahmoud Ben Hassine
5471
*/
@@ -216,44 +233,136 @@ public void testMasterStepCreation() {
216233
* The following test is to cover setters that override those from parent builders.
217234
*/
218235
@Test
219-
public void testSetters() {
236+
public void testSetters() throws Exception {
220237
// when
238+
DefaultTransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
239+
240+
Object annotatedListener = new Object();
241+
MapRetryContextCache retryCache = new MapRetryContextCache();
242+
RepeatTemplate stepOperations = new RepeatTemplate();
243+
NoBackOffPolicy backOffPolicy = new NoBackOffPolicy();
244+
ItemStreamSupport stream = new ItemStreamSupport() {
245+
};
246+
StepExecutionListener stepExecutionListener = mock(StepExecutionListener.class);
247+
ItemReadListener<String> itemReadListener = mock(ItemReadListener.class);
248+
ItemWriteListener<String> itemWriteListener = mock(ItemWriteListener.class);
249+
ChunkListener chunkListener = mock(ChunkListener.class);
250+
SkipListener<String, String> skipListener = mock(SkipListener.class);
251+
RetryListener retryListener = mock(RetryListener.class);
252+
253+
when(retryListener.open(any(), any())).thenReturn(true);
254+
255+
ItemProcessor<String, String> itemProcessor = item -> {
256+
System.out.println("processing item " + item);
257+
if(item.equals("b")) {
258+
throw new Exception("b was found");
259+
}
260+
else {
261+
return item;
262+
}
263+
};
264+
265+
ItemReader<String> itemReader = new ItemReader<String>() {
266+
267+
int count = 0;
268+
List<String> items = Arrays.asList("a", "b", "c", "d", "d", "e", "f", "g", "h", "i");
269+
270+
@Override
271+
public String read() throws Exception {
272+
System.out.println(">> count == " + count);
273+
if(count == 6) {
274+
count++;
275+
throw new IOException("6th item");
276+
}
277+
else if(count == 7) {
278+
count++;
279+
throw new RuntimeException("7th item");
280+
}
281+
else if(count < items.size()){
282+
String item = items.get(count++);
283+
System.out.println(">> item read was " + item);
284+
return item;
285+
}
286+
else {
287+
return null;
288+
}
289+
}
290+
};
291+
221292
TaskletStep taskletStep = new RemoteChunkingMasterStepBuilder<String, String>("step")
222-
.reader(this.itemReader)
293+
.reader(itemReader)
223294
.readerIsTransactionalQueue()
295+
.processor(itemProcessor)
224296
.repository(this.jobRepository)
225297
.transactionManager(this.transactionManager)
226-
.transactionAttribute(new DefaultTransactionAttribute())
298+
.transactionAttribute(transactionAttribute)
227299
.inputChannel(this.inputChannel)
228300
.outputChannel(this.outputChannel)
229-
.listener(new Object())
230-
.listener(new SkipListenerSupport<>())
231-
.listener(new ChunkListenerSupport())
232-
.listener(new StepExecutionListenerSupport())
233-
.listener(new CompositeItemReadListener<>())
234-
.listener(new CompositeItemWriteListener<>())
235-
.listener(new RetryListenerSupport())
301+
.listener(annotatedListener)
302+
.listener(skipListener)
303+
.listener(chunkListener)
304+
.listener(stepExecutionListener)
305+
.listener(itemReadListener)
306+
.listener(itemWriteListener)
307+
.listener(retryListener)
236308
.skip(Exception.class)
237309
.noSkip(RuntimeException.class)
238310
.skipLimit(10)
239-
.retry(Exception.class)
311+
.retry(IOException.class)
240312
.noRetry(RuntimeException.class)
241313
.retryLimit(10)
242-
.retryContextCache(new MapRetryContextCache())
314+
.retryContextCache(retryCache)
243315
.noRollback(Exception.class)
244-
.chunk(10)
245316
.startLimit(3)
246317
.allowStartIfComplete(true)
247-
.exceptionHandler(new DefaultExceptionHandler())
248-
.stepOperations(new RepeatTemplate())
249-
.chunkOperations(new RepeatTemplate())
250-
.backOffPolicy(new NoBackOffPolicy())
251-
.stream(new ItemStreamSupport() {})
318+
.stepOperations(stepOperations)
319+
.chunk(3)
320+
.backOffPolicy(backOffPolicy)
321+
.stream(stream)
252322
.keyGenerator(Object::hashCode)
253323
.build();
254324

325+
JobExecution jobExecution = this.jobRepository.createJobExecution("job1", new JobParameters());
326+
StepExecution stepExecution = new StepExecution("step1", jobExecution);
327+
this.jobRepository.add(stepExecution);
328+
329+
taskletStep.execute(stepExecution);
330+
255331
// then
256332
Assert.assertNotNull(taskletStep);
333+
ChunkOrientedTasklet tasklet = (ChunkOrientedTasklet) ReflectionTestUtils.getField(taskletStep, "tasklet");
334+
SimpleChunkProvider provider = (SimpleChunkProvider) ReflectionTestUtils.getField(tasklet, "chunkProvider");
335+
SimpleChunkProcessor processor = (SimpleChunkProcessor) ReflectionTestUtils.getField(tasklet, "chunkProcessor");
336+
ItemWriter itemWriter = (ItemWriter) ReflectionTestUtils.getField(processor, "itemWriter");
337+
MessagingTemplate messagingTemplate = (MessagingTemplate) ReflectionTestUtils.getField(itemWriter, "messagingGateway");
338+
CompositeItemStream compositeItemStream = (CompositeItemStream) ReflectionTestUtils.getField(taskletStep, "stream");
339+
340+
Assert.assertEquals(ReflectionTestUtils.getField(provider, "itemReader"), itemReader);
341+
Assert.assertFalse((Boolean) ReflectionTestUtils.getField(tasklet, "buffering"));
342+
Assert.assertEquals(ReflectionTestUtils.getField(taskletStep, "jobRepository"), this.jobRepository);
343+
Assert.assertEquals(ReflectionTestUtils.getField(taskletStep, "transactionManager"), this.transactionManager);
344+
Assert.assertEquals(ReflectionTestUtils.getField(taskletStep, "transactionAttribute"), transactionAttribute);
345+
Assert.assertEquals(ReflectionTestUtils.getField(itemWriter, "replyChannel"), this.inputChannel);
346+
Assert.assertEquals(ReflectionTestUtils.getField(messagingTemplate, "defaultDestination"), this.outputChannel);
347+
Assert.assertEquals(ReflectionTestUtils.getField(processor, "itemProcessor"), itemProcessor);
348+
349+
Assert.assertEquals((int) ReflectionTestUtils.getField(taskletStep, "startLimit"), 3);
350+
Assert.assertTrue((Boolean) ReflectionTestUtils.getField(taskletStep, "allowStartIfComplete"));
351+
Object stepOperationsUsed = ReflectionTestUtils.getField(taskletStep, "stepOperations");
352+
Assert.assertEquals(stepOperationsUsed, stepOperations);
353+
354+
Assert.assertEquals(((List)ReflectionTestUtils.getField(compositeItemStream, "streams")).size(), 2);
355+
Assert.assertNotNull(ReflectionTestUtils.getField(processor, "keyGenerator"));
356+
357+
verify(skipListener, atLeastOnce()).onSkipInProcess(any(), any());
358+
verify(retryListener, atLeastOnce()).open(any(), any());
359+
verify(stepExecutionListener, atLeastOnce()).beforeStep(any());
360+
verify(chunkListener, atLeastOnce()).beforeChunk(any());
361+
verify(itemReadListener, atLeastOnce()).beforeRead();
362+
verify(itemWriteListener, atLeastOnce()).beforeWrite(any());
363+
364+
Assert.assertEquals(stepExecution.getSkipCount(), 2);
365+
Assert.assertEquals(stepExecution.getRollbackCount(), 3);
257366
}
258367

259368
@Configuration

0 commit comments

Comments
 (0)