Skip to content

Adapt JobLauncherApplicationRunner to latest changes in Spring Batch 6 #46447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.ExecutionContextSerializer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand Down Expand Up @@ -82,9 +81,8 @@ public class BatchAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnBooleanProperty(name = "spring.batch.job.enabled", matchIfMissing = true)
public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobOperator jobOperator,
JobRepository jobRepository, BatchProperties properties) {
JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobOperator, jobRepository);
public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobOperator jobOperator, BatchProperties properties) {
JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobOperator);
String jobName = properties.getJob().getName();
if (StringUtils.hasText(jobName)) {
runner.setJobName(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,23 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.job.JobExecutionException;
import org.springframework.batch.core.job.parameters.JobParameter;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
import org.springframework.batch.core.job.parameters.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -80,8 +74,6 @@ public class JobLauncherApplicationRunner

private final JobOperator jobOperator;

private final JobRepository jobRepository;

private JobRegistry jobRegistry;

private String jobName;
Expand All @@ -95,14 +87,10 @@ public class JobLauncherApplicationRunner
/**
* Create a new {@link JobLauncherApplicationRunner}.
* @param jobOperator to launch jobs
* @param jobRepository to check if a job instance exists with the given parameters
* when running a job
*/
public JobLauncherApplicationRunner(JobOperator jobOperator, JobRepository jobRepository) {
public JobLauncherApplicationRunner(JobOperator jobOperator) {
Assert.notNull(jobOperator, "'jobOperator' must not be null");
Assert.notNull(jobRepository, "'jobRepository' must not be null");
this.jobOperator = jobOperator;
this.jobRepository = jobRepository;
}

@Override
Expand Down Expand Up @@ -197,46 +185,10 @@ private void executeRegisteredJobs(JobParameters jobParameters) throws JobExecut
protected void execute(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, NoSuchJobException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException {
JobParameters parameters = getNextJobParameters(job, jobParameters);
JobExecution execution = this.jobOperator.start(job, parameters);
JobExecution execution = this.jobOperator.start(job, jobParameters);
if (this.publisher != null) {
this.publisher.publishEvent(new JobExecutionEvent(execution));
}
}

private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
if (this.jobRepository != null && this.jobRepository.getJobInstance(job.getName(), jobParameters) != null) {
return getNextJobParametersForExisting(job, jobParameters);
}
if (job.getJobParametersIncrementer() == null) {
return jobParameters;
}
JobParameters nextParameters = new JobParametersBuilder(jobParameters, this.jobRepository)
.getNextJobParameters(job)
.toJobParameters();
return merge(nextParameters, jobParameters);
}

private JobParameters getNextJobParametersForExisting(Job job, JobParameters jobParameters) {
JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (isStoppedOrFailed(lastExecution) && job.isRestartable()) {
JobParameters previousIdentifyingParameters = new JobParameters(
lastExecution.getJobParameters().getIdentifyingParameters());
return merge(previousIdentifyingParameters, jobParameters);
}
return jobParameters;
}

private boolean isStoppedOrFailed(JobExecution execution) {
BatchStatus status = (execution != null) ? execution.getStatus() : null;
return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
}

private JobParameters merge(JobParameters parameters, JobParameters additionals) {
Map<String, JobParameter<?>> merged = new LinkedHashMap<>();
merged.putAll(parameters.getParameters());
merged.putAll(additionals.getParameters());
return new JobParameters(merged);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,7 @@ void defaultJobParametersConverterIsUsed() {
}

private JobLauncherApplicationRunner createInstance(String... registeredJobNames) {
JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(mock(JobOperator.class),
mock(JobRepository.class));
JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(mock(JobOperator.class));
JobRegistry jobRegistry = mock(JobRegistry.class);
given(jobRegistry.getJobNames()).willReturn(Arrays.asList(registeredJobNames));
runner.setJobRegistry(jobRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ void incrementExistingExecution() {
this.contextRunner.run((context) -> {
JobLauncherApplicationRunnerContext jobLauncherContext = new JobLauncherApplicationRunnerContext(context);
Job job = jobLauncherContext.configureJob().incrementer(new RunIdIncrementer()).build();
jobLauncherContext.runner.execute(job, new JobParameters());
jobLauncherContext.runner.execute(job, new JobParameters());
JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo").toJobParameters();
jobLauncherContext.runner.execute(job, jobParameters);
jobLauncherContext.runner.execute(job, jobParameters);
assertThat(jobLauncherContext.jobInstances()).hasSize(2);
});
}

@Test
void retryFailedExecution() {
void retryFailedExecutionWithIncrementer() {
this.contextRunner.run((context) -> {
PlatformTransactionManager transactionManager = context.getBean(PlatformTransactionManager.class);
JobLauncherApplicationRunnerContext jobLauncherContext = new JobLauncherApplicationRunnerContext(context);
Expand All @@ -102,7 +103,23 @@ void retryFailedExecution() {
.incrementer(new RunIdIncrementer())
.build();
jobLauncherContext.runner.execute(job, new JobParameters());
jobLauncherContext.runner.execute(job, new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
jobLauncherContext.runner.execute(job, new JobParameters());
// with an incrementer, we always create a new job instance
assertThat(jobLauncherContext.jobInstances()).hasSize(2);
});
}

@Test
void retryFailedExecutionWithoutIncrementer() {
this.contextRunner.run((context) -> {
PlatformTransactionManager transactionManager = context.getBean(PlatformTransactionManager.class);
JobLauncherApplicationRunnerContext jobLauncherContext = new JobLauncherApplicationRunnerContext(context);
Job job = jobLauncherContext.jobBuilder()
.start(jobLauncherContext.stepBuilder().tasklet(throwingTasklet(), transactionManager).build())
.build();
JobParameters jobParameters = new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
jobLauncherContext.runner.execute(job, jobParameters);
jobLauncherContext.runner.execute(job, jobParameters);
assertThat(jobLauncherContext.jobInstances()).hasSize(1);
});
}
Expand Down Expand Up @@ -134,17 +151,14 @@ void retryFailedExecutionOnNonRestartableJob() {
Job job = jobLauncherContext.jobBuilder()
.preventRestart()
.start(jobLauncherContext.stepBuilder().tasklet(throwingTasklet(), transactionManager).build())
.incrementer(new RunIdIncrementer())
.build();
jobLauncherContext.runner.execute(job, new JobParameters());
jobLauncherContext.runner.execute(job, new JobParameters());
// A failed job that is not restartable does not re-use the job params of
// the last execution, but creates a new job instance when running it again.
assertThat(jobLauncherContext.jobInstances()).hasSize(2);
JobParameters jobParameters = new JobParametersBuilder()
.addString("name", "foo").toJobParameters();
jobLauncherContext.runner.execute(job, jobParameters);
assertThat(jobLauncherContext.jobInstances()).hasSize(1);
assertThatExceptionOfType(JobRestartException.class).isThrownBy(() -> {
// try to re-run a failed execution
jobLauncherContext.runner.execute(job,
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
jobLauncherContext.runner.execute(job, jobParameters);
fail("expected JobRestartException");
}).withMessageContaining("JobInstance already exists and is not restartable");
});
Expand All @@ -157,9 +171,8 @@ void retryFailedExecutionWithNonIdentifyingParameters() {
JobLauncherApplicationRunnerContext jobLauncherContext = new JobLauncherApplicationRunnerContext(context);
Job job = jobLauncherContext.jobBuilder()
.start(jobLauncherContext.stepBuilder().tasklet(throwingTasklet(), transactionManager).build())
.incrementer(new RunIdIncrementer())
.build();
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
JobParameters jobParameters = new JobParametersBuilder().addLong("run.id", 1L, true)
.addLong("foo", 2L, false)
.toJobParameters();
jobLauncherContext.runner.execute(job, jobParameters);
Expand Down Expand Up @@ -200,7 +213,7 @@ static class JobLauncherApplicationRunnerContext {
this.jobBuilder = new JobBuilder("job", jobRepository);
this.job = this.jobBuilder.start(this.step).build();
this.jobRepository = context.getBean(JobRepository.class);
this.runner = new JobLauncherApplicationRunner(jobOperator, jobRepository);
this.runner = new JobLauncherApplicationRunner(jobOperator);
}

List<JobInstance> jobInstances() {
Expand Down
Loading