From 1559ee6adbd6716ee5c556f03e179b08c2438992 Mon Sep 17 00:00:00 2001 From: Henning Poettker Date: Mon, 15 Aug 2022 17:09:27 +0200 Subject: [PATCH] Explicit handling of nested splits in SplitState --- .../batch/core/job/builder/FlowBuilder.java | 26 +++++++------- .../job/flow/support/state/SplitState.java | 35 +++++++++++++------ .../core/job/builder/FlowJobBuilderTests.java | 17 +++++++++ 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/FlowBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/FlowBuilder.java index 6116e0df35..3fd24dd605 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/FlowBuilder.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/builder/FlowBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2020 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -300,9 +300,9 @@ else if (input instanceof Flow) { return result; } - private SplitState createState(Collection flows, TaskExecutor executor) { + private SplitState createState(Collection flows, TaskExecutor executor, SplitState parentSplit) { if (!states.containsKey(flows)) { - states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++))); + states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++), parentSplit)); } SplitState result = (SplitState) states.get(flows); if (executor != null) { @@ -606,7 +606,7 @@ public static class SplitBuilder { private final FlowBuilder parent; - private TaskExecutor executor; + private final TaskExecutor executor; /** * @param parent the parent builder @@ -626,23 +626,23 @@ public SplitBuilder(FlowBuilder parent, TaskExecutor executor) { public FlowBuilder add(Flow... flows) { Collection list = new ArrayList<>(Arrays.asList(flows)); String name = "split" + (parent.splitCounter++); - int counter = 0; State one = parent.currentState; - Flow flow = null; + + if (one instanceof SplitState) { + parent.currentState = parent.createState(list, executor, (SplitState) one); + return parent; + } + if (!(one == null || one instanceof FlowState)) { - FlowBuilder stateBuilder = new FlowBuilder<>(name + "_" + (counter++)); + FlowBuilder stateBuilder = new FlowBuilder<>(name + "_0"); stateBuilder.currentState = one; - flow = stateBuilder.build(); + list.add(stateBuilder.build()); } else if (one instanceof FlowState && parent.states.size() == 1) { list.add(((FlowState) one).getFlows().iterator().next()); } - if (flow != null) { - list.add(flow); - } - State next = parent.createState(list, executor); - parent.currentState = next; + parent.currentState = parent.createState(list, executor, null); return parent; } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java index 790afb7281..5b58503962 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2013 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,9 @@ package org.springframework.batch.core.job.flow.support.state; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.concurrent.Callable; +import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -32,6 +33,7 @@ import org.springframework.core.task.SyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskRejectedException; +import org.springframework.lang.Nullable; /** * A {@link State} implementation that splits a {@link Flow} into multiple parallel @@ -44,17 +46,29 @@ public class SplitState extends AbstractState implements FlowHolder { private final Collection flows; + private final SplitState parentSplit; + private TaskExecutor taskExecutor = new SyncTaskExecutor(); - private FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator(); + private final FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator(); /** * @param flows collection of {@link Flow} instances. * @param name the name of the state. */ public SplitState(Collection flows, String name) { + this(flows, name, null); + } + + /** + * @param flows collection of {@link Flow} instances. + * @param name the name of the state. + * @param parentSplit the parent {@link SplitState}. + */ + public SplitState(Collection flows, String name, @Nullable SplitState parentSplit) { super(name); this.flows = flows; + this.parentSplit = parentSplit; } /** @@ -88,12 +102,7 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception for (final Flow flow : flows) { - final FutureTask task = new FutureTask<>(new Callable() { - @Override - public FlowExecution call() throws Exception { - return flow.start(executor); - } - }); + final FutureTask task = new FutureTask<>(() -> flow.start(executor)); tasks.add(task); @@ -106,6 +115,8 @@ public FlowExecution call() throws Exception { } + FlowExecutionStatus parentSplitStatus = parentSplit == null ? null : parentSplit.handle(executor); + Collection results = new ArrayList<>(); // Could use a CompletionService here? @@ -125,7 +136,11 @@ public FlowExecution call() throws Exception { } } - return doAggregation(results, executor); + FlowExecutionStatus flowExecutionStatus = doAggregation(results, executor); + if (parentSplitStatus != null) { + return Collections.max(Arrays.asList(flowExecutionStatus, parentSplitStatus)); + } + return flowExecutionStatus; } protected FlowExecutionStatus doAggregation(Collection results, FlowExecutor executor) { diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java index 7feb030a20..c68b17ddb2 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java @@ -40,6 +40,7 @@ import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.FlowExecutionStatus; import org.springframework.batch.core.job.flow.JobExecutionDecider; +import org.springframework.batch.core.job.flow.support.SimpleFlow; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; @@ -168,6 +169,22 @@ void testBuildSplit() { assertEquals(2, execution.getStepExecutions().size()); } + @Test + void testNestedSplitsWithSingleThread() { + SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); + taskExecutor.setConcurrencyLimit(1); + + FlowBuilder flowBuilder = new FlowBuilder<>("flow"); + FlowBuilder.SplitBuilder splitBuilder = flowBuilder.split(taskExecutor); + splitBuilder.add(new FlowBuilder("subflow1").from(step1).end()); + splitBuilder.add(new FlowBuilder("subflow2").from(step2).end()); + Job job = new JobBuilder("job").repository(jobRepository).start(flowBuilder.build()).end().build(); + job.execute(execution); + + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + @Test void testBuildSplitUsingStartAndAdd_BATCH_2346() { Flow subflow1 = new FlowBuilder("subflow1").from(step2).end();