diff --git a/build.gradle b/build.gradle index b0e139713c..84fc6a9f37 100644 --- a/build.gradle +++ b/build.gradle @@ -553,7 +553,7 @@ project('spring-batch-samples') { dependencies { - compile project(":spring-batch-core") + compile project(":spring-batch-integration") compile "org.aspectj:aspectjrt:$aspectjVersion" compile "org.aspectj:aspectjweaver:$aspectjVersion" compile "org.quartz-scheduler:quartz:$quartzVersion" @@ -583,6 +583,8 @@ project('spring-batch-samples') { compile "org.springframework:spring-tx:$springVersion" compile "org.springframework.data:spring-data-jpa:$springDataJpaVersion" compile "javax.mail:javax.mail-api:$javaMailVersion" + compile "org.apache.activemq:activemq-client:$activemqVersion" + compile "org.apache.activemq:activemq-broker:$activemqVersion" testCompile "org.xmlunit:xmlunit-core:$xmlunitVersion" testCompile "org.xmlunit:xmlunit-matchers:$xmlunitVersion" @@ -596,6 +598,7 @@ project('spring-batch-samples') { testCompile "org.codehaus.groovy:groovy-ant:$groovyVersion" testCompile "org.springframework:spring-test:$springVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" + testCompile "org.apache.activemq:activemq-kahadb-store:$activemqVersion" testRuntime "com.sun.mail:javax.mail:$javaMailVersion" diff --git a/spring-batch-docs/asciidoc/images/remote-chunking-sbi.png b/spring-batch-docs/asciidoc/images/remote-chunking-sbi.png index 0f2737bcf5..b1510778e5 100644 Binary files a/spring-batch-docs/asciidoc/images/remote-chunking-sbi.png and b/spring-batch-docs/asciidoc/images/remote-chunking-sbi.png differ diff --git a/spring-batch-docs/asciidoc/spring-batch-integration.adoc b/spring-batch-docs/asciidoc/spring-batch-integration.adoc index 7ccfa0bfa1..5eba5b80f3 100644 --- a/spring-batch-docs/asciidoc/spring-batch-integration.adoc +++ b/spring-batch-docs/asciidoc/spring-batch-integration.adoc @@ -706,10 +706,10 @@ configuration similar to the following: .Java Configuration [source, java, role="javaContent"] ---- -public Job chunkJob(ItemReader itemReader) { +public Job chunkJob() { return jobBuilderFactory.get("personJob") .start(stepBuilderFactory.get("step1") - .chunk(200) + .chunk(200) .reader(itemReader()) .writer(itemWriter()) .build()) @@ -722,7 +722,7 @@ to use for reading data on the master. The `ItemWriter` reference points to a special `ItemWriter` (called `ChunkMessageChannelItemWriter`), as described above. The processor (if any) is left off the -master configuration, as it is configured on the slave. The +master configuration, as it is configured on the worker. The following configuration provides a basic master setup. You should check any additional component properties, such as throttle limits and so on, when implementing your use case. @@ -734,7 +734,7 @@ throttle limits and so on, when implementing your use case. - + @@ -749,12 +749,6 @@ throttle limits and so on, when implementing your use case. - - - - - @@ -774,43 +768,26 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { return factory; } +/* + * Configure outbound flow (requests going to workers) + */ + @Bean public DirectChannel requests() { return new DirectChannel(); } @Bean -public IntegrationFlow jmsOutboundFlow() { - return IntegrationFlows.from("requests") - .handle(Jms.outboundGateway(connectionFactory()) - .requestDestination("requests")) +public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { + return IntegrationFlows + .from(requests()) + .handle(Jms.outboundAdapter(connectionFactory).destination("requests")) .get(); } -@Bean -public MessagingTemplate messagingTemplate() { - MessagingTemplate template = new MessagingTemplate(); - template.setDefaultChannel(requests()); - template.setReceiveTimeout(2000); - return template; -} - -@Bean -@StepScope -public ChunkMessageChannelItemWriter itemWriter() { - ChunkMessageChannelItemWriter chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter(); - chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate()); - chunkMessageChannelItemWriter.setReplyChannel(replies()); - return chunkMessageChannelItemWriter; -} - -@Bean -public RemoteChunkHandlerFactoryBean chunkHandler() { - RemoteChunkHandlerFactoryBean remoteChunkHandlerFactoryBean = new RemoteChunkHandlerFactoryBean(); - remoteChunkHandlerFactoryBean.setChunkWriter(itemWriter()); - remoteChunkHandlerFactoryBean.setStep(step1()); - return remoteChunkHandlerFactoryBean; -} +/* + * Configure inbound flow (replies coming from workers) + */ @Bean public QueueChannel replies() { @@ -818,14 +795,28 @@ public QueueChannel replies() { } @Bean -public IntegrationFlow jmsReplies() { +public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlows - .from(Jms.messageDrivenChannelAdapter(connectionFactory()) - .configureListenerContainer(c -> c.subscriptionDurable(false)) - .destination("replies")) + .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies")) .channel(replies()) .get(); } + +/* + * Configure the ChunkMessageChannelItemWriter + */ + +@Bean +public ItemWriter itemWriter() { + MessagingTemplate messagingTemplate = new MessagingTemplate(); + messagingTemplate.setDefaultChannel(requests()); + messagingTemplate.setReceiveTimeout(2000); + ChunkMessageChannelItemWriter chunkMessageChannelItemWriter + = new ChunkMessageChannelItemWriter<>(); + chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate); + chunkMessageChannelItemWriter.setReplyChannel(replies()); + return chunkMessageChannelItemWriter; +} ---- The preceding configuration provides us with a number of beans. We @@ -836,7 +827,7 @@ referenced by our job step, uses the `ChunkMessageChannelItemWriter` for writing chunks over the configured middleware. -Now we can move on to the slave configuration, as shown in the following example: +Now we can move on to the worker configuration, as shown in the following example: .XML Configuration @@ -849,7 +840,7 @@ Now we can move on to the slave configuration, as shown in the following example - @@ -889,65 +880,72 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { return factory; } +/* + * Configure inbound flow (requests coming from the master) + */ + @Bean public DirectChannel requests() { return new DirectChannel(); } -@Bean -public DirectChannel replies() { - return new DirectChannel(); -} @Bean -public IntegrationFlow jmsIn() { +public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlows - .from(Jms.messageDrivenChannelAdapter(connectionFactory()) - .configureListenerContainer(c -> c.subscriptionDurable(false)) - .destination("requests")) + .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests")) .channel(requests()) .get(); } +/* + * Configure outbound flow (replies going to the master) + */ + @Bean -public IntegrationFlow outgoingReplies() { - return IntegrationFlows.from("replies") - .handle(Jms.outboundGateway(connectionFactory()) - .requestDestination("replies")) - .get(); +public DirectChannel replies() { + return new DirectChannel(); } @Bean -@ServiceActivator(inputChannel = "requests") -public AggregatorFactoryBean serviceActivator() throws Exception{ - AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean(); - aggregatorFactoryBean.setProcessorBean(chunkProcessorChunkHandler()); - aggregatorFactoryBean.setOutputChannel(replies()); - ... - return aggregatorFactoryBean; +public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { + return IntegrationFlows + .from(replies()) + .handle(Jms.outboundAdapter(connectionFactory).destination("replies")) + .get(); } +/* + * Configure the ChunkProcessorChunkHandler + */ + @Bean -public ChunkProcessorChunkHandler chunkProcessorChunkHandler() { - ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler(); - chunkProcessorChunkHandler.setChunkProcessor(new SimpleChunkProcessor(personItemProcessor(), personItemWriter())); +@ServiceActivator(inputChannel = "requests", outputChannel = "replies") +public ChunkProcessorChunkHandler chunkProcessorChunkHandler() { + ChunkProcessor chunkProcessor + = new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); + ChunkProcessorChunkHandler chunkProcessorChunkHandler + = new ChunkProcessorChunkHandler<>(); + chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); return chunkProcessorChunkHandler; } ---- Most of these configuration items should look familiar from the -master configuration. Slaves do not need access to +master configuration. Workers do not need access to the Spring Batch `JobRepository` nor to the actual job configuration file. The main bean of interest is the `chunkProcessorChunkHandler`. The `chunkProcessor` property of `ChunkProcessorChunkHandler` takes a configured `SimpleChunkProcessor`, which is where you would provide a reference to your `ItemWriter` (and, optionally, your -`ItemProcessor`) that will run on the slave +`ItemProcessor`) that will run on the worker when it receives chunks from the master. For more information, see the section of the "Scalability" chapter on link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking]. +You can find a complete example of a remote chunking job +link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here]. [[remote-partitioning]] @@ -960,8 +958,8 @@ image::{batch-asciidoc}images/remote-partitioning.png[Remote Partitioning, scale Remote Partitioning, on the other hand, is useful when it is not the processing of items but rather the associated I/O that causes the bottleneck. Using Remote Partitioning, work can -be farmed out to slaves that execute complete Spring Batch -steps. Thus, each slave has its own `ItemReader`, `ItemProcessor`, and +be farmed out to workers that execute complete Spring Batch +steps. Thus, each worker has its own `ItemReader`, `ItemProcessor`, and `ItemWriter`. For this purpose, Spring Batch Integration provides the `MessageChannelPartitionHandler`. diff --git a/spring-batch-samples/README.md b/spring-batch-samples/README.md index 78cfb34b84..9daf02c456 100644 --- a/spring-batch-samples/README.md +++ b/spring-batch-samples/README.md @@ -39,6 +39,7 @@ Job/Feature | skip | retry | restart | aut [multilineOrder](#multilineOrder) | | | | | | | X | | | | [parallel](#parallel) | | | | | | | | | | X | [partition](#partition) | | | | | | | | | | X | +[remoteChunking](#remoteChunking) | | | | | | | | | | X | [quartz](#quartz) | | | | | X | | | | | | [restart](#restart) | | | X | | | | | | | | [retry](#retry) | | X | | | | | | | | | @@ -644,6 +645,23 @@ the work. Notice that the readers and writers in the `Step` that is being partitioned are step-scoped, so that their state does not get shared across threads of execution. +### [Remote Chunking Sample](id:remoteChunking) + +This sample shows how to configure a remote chunking job. The master step will +read numbers from 1 to 6 and send two chunks ({1, 2, 3} and {4, 5, 6}) to workers +for processing and writing. + +This example shows how to: + +* configure a `ChunkMessageChannelItemWriter` on the master side to send chunks to workers +* configure a `ChunkProcessorChunkHandler` on the worker side to process chunks and +send replies back to the master + +The sample uses an embedded JMS broker as a communication middleware between the +master and workers. The usage of an embedded broker is only for simplicity's sake, +the communication between the master and workers is still done through JMS queues +and Spring Integration channels and messages are sent over the wire through a TCP port. + ### [Quartz Sample](id:quartz) The goal is to demonstrate how to schedule job execution using diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/MasterConfiguration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/MasterConfiguration.java new file mode 100644 index 0000000000..4441033773 --- /dev/null +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/MasterConfiguration.java @@ -0,0 +1,139 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.sample.remotechunking; + +import java.util.Arrays; + +import org.apache.activemq.ActiveMQConnectionFactory; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter; +import org.springframework.batch.item.support.ListItemReader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.core.MessagingTemplate; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.jms.dsl.Jms; + +/** + * This configuration class is for the master side of the remote chunking sample. + * The master step reads numbers from 1 to 6 and sends 2 chunks {1, 2, 3} and + * {4, 5, 6} to workers for processing and writing. + * + * @author Mahmoud Ben Hassine + */ +@Configuration +@EnableBatchProcessing +@EnableIntegration +@PropertySource("classpath:remote-chunking.properties") +public class MasterConfiguration { + + @Value("${broker.url}") + private String brokerUrl; + + @Autowired + private JobBuilderFactory jobBuilderFactory; + + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Bean + public ActiveMQConnectionFactory connectionFactory() { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(this.brokerUrl); + connectionFactory.setTrustAllPackages(true); + return connectionFactory; + } + + /* + * Configure outbound flow (requests going to workers) + */ + @Bean + public DirectChannel requests() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { + return IntegrationFlows + .from(requests()) + .handle(Jms.outboundAdapter(connectionFactory).destination("requests")) + .get(); + } + + /* + * Configure inbound flow (replies coming from workers) + */ + @Bean + public QueueChannel replies() { + return new QueueChannel(); + } + + @Bean + public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { + return IntegrationFlows + .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies")) + .channel(replies()) + .get(); + } + + /* + * Configure master step components + */ + @Bean + public ListItemReader itemReader() { + return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6)); + } + + @Bean + public ChunkMessageChannelItemWriter itemWriter() { + MessagingTemplate messagingTemplate = new MessagingTemplate(); + messagingTemplate.setDefaultChannel(requests()); + ChunkMessageChannelItemWriter chunkMessageChannelItemWriter + = new ChunkMessageChannelItemWriter<>(); + chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate); + chunkMessageChannelItemWriter.setReplyChannel(replies()); + return chunkMessageChannelItemWriter; + } + + @Bean + public TaskletStep masterStep() { + return this.stepBuilderFactory.get("masterStep") + .chunk(3) + .reader(itemReader()) + .writer(itemWriter()) + .build(); + } + + @Bean + public Job remoteChunkingJob() { + return this.jobBuilderFactory.get("remoteChunkingJob") + .start(masterStep()) + .build(); + } + +} diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java new file mode 100644 index 0000000000..84363e7b68 --- /dev/null +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java @@ -0,0 +1,126 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.sample.remotechunking; + +import org.apache.activemq.ActiveMQConnectionFactory; + +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.step.item.ChunkProcessor; +import org.springframework.batch.core.step.item.SimpleChunkProcessor; +import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.jms.dsl.Jms; + +/** + * This configuration class is for the worker side of the remote chunking sample. + * It configures a {@link ChunkProcessorChunkHandler} as a service activator to: + *
    + *
  • receive requests from the master
  • + *
  • process chunks with the configured item processor and writer
  • + *
  • send replies to the master
  • + *
+ * + * @author Mahmoud Ben Hassine + */ +@Configuration +@EnableBatchProcessing +@EnableIntegration +@PropertySource("classpath:remote-chunking.properties") +public class WorkerConfiguration { + + @Value("${broker.url}") + private String brokerUrl; + + @Bean + public ActiveMQConnectionFactory connectionFactory() { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(this.brokerUrl); + connectionFactory.setTrustAllPackages(true); + return connectionFactory; + } + + /* + * Configure inbound flow (requests coming from the master) + */ + @Bean + public DirectChannel requests() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { + return IntegrationFlows + .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests")) + .channel(requests()) + .get(); + } + + /* + * Configure outbound flow (replies going to the master) + */ + @Bean + public DirectChannel replies() { + return new DirectChannel(); + } + + @Bean + public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { + return IntegrationFlows + .from(replies()) + .handle(Jms.outboundAdapter(connectionFactory).destination("replies")) + .get(); + } + + /* + * Configure worker components + */ + @Bean + public ItemProcessor itemProcessor() { + return item -> { + System.out.println("processing item " + item); + return item; + }; + } + + @Bean + public ItemWriter itemWriter() { + return items -> { + for (Integer item : items) { + System.out.println("writing item " + item); + } + }; + } + + @Bean + @ServiceActivator(inputChannel = "requests", outputChannel = "replies") + public ChunkProcessorChunkHandler chunkProcessorChunkHandler() { + ChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); + ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); + chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); + return chunkProcessorChunkHandler; + } + +} diff --git a/spring-batch-samples/src/main/resources/remote-chunking.properties b/spring-batch-samples/src/main/resources/remote-chunking.properties new file mode 100644 index 0000000000..0a85c22526 --- /dev/null +++ b/spring-batch-samples/src/main/resources/remote-chunking.properties @@ -0,0 +1 @@ +broker.url=tcp://localhost:61616 diff --git a/spring-batch-samples/src/test/java/org/springframework/batch/sample/RemoteChunkingJobFunctionalTests.java b/spring-batch-samples/src/test/java/org/springframework/batch/sample/RemoteChunkingJobFunctionalTests.java new file mode 100644 index 0000000000..d89b33b2c0 --- /dev/null +++ b/spring-batch-samples/src/test/java/org/springframework/batch/sample/RemoteChunkingJobFunctionalTests.java @@ -0,0 +1,88 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.sample; + +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.sample.config.JobRunnerConfiguration; +import org.springframework.batch.sample.remotechunking.MasterConfiguration; +import org.springframework.batch.sample.remotechunking.WorkerConfiguration; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.PropertySource; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * The master step of the job under test will read data and send chunks to the worker + * (started in {@link RemoteChunkingJobFunctionalTests#setUp()}) for processing and writing. + * + * @author Mahmoud Ben Hassine + */ +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = {JobRunnerConfiguration.class, MasterConfiguration.class}) +@PropertySource("classpath:remote-chunking.properties") +public class RemoteChunkingJobFunctionalTests { + + private static final String BROKER_DATA_DIRECTORY = "build/activemq-data"; + + @Value("${broker.url}") + private String brokerUrl; + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + private BrokerService brokerService; + + private AnnotationConfigApplicationContext workerApplicationContext; + + @Before + public void setUp() throws Exception { + this.brokerService = new BrokerService(); + this.brokerService.addConnector(this.brokerUrl); + this.brokerService.setDataDirectory(BROKER_DATA_DIRECTORY); + this.brokerService.start(); + this.workerApplicationContext = new AnnotationConfigApplicationContext(WorkerConfiguration.class); + } + + @After + public void tearDown() throws Exception { + this.workerApplicationContext.close(); + this.brokerService.stop(); + } + + @Test + public void testRemoteChunkingJob() throws Exception { + // when + JobExecution jobExecution = this.jobLauncherTestUtils.launchJob(); + + // then + Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode()); + Assert.assertEquals( + "Waited for 2 results.", // the master sent 2 chunks ({1, 2, 3} and {4, 5, 6}) to workers + jobExecution.getExitStatus().getExitDescription()); + } + +}