Skip to content

Explicit handling of nested splits in SplitState #3903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -300,9 +300,9 @@ else if (input instanceof Flow) {
return result;
}

private SplitState createState(Collection<Flow> flows, TaskExecutor executor) {
private SplitState createState(Collection<Flow> flows, TaskExecutor executor, SplitState parentSplit) {
if (!states.containsKey(flows)) {
states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++)));
states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++), parentSplit));
}
SplitState result = (SplitState) states.get(flows);
if (executor != null) {
Expand Down Expand Up @@ -606,7 +606,7 @@ public static class SplitBuilder<Q> {

private final FlowBuilder<Q> parent;

private TaskExecutor executor;
private final TaskExecutor executor;

/**
* @param parent the parent builder
Expand All @@ -626,23 +626,23 @@ public SplitBuilder(FlowBuilder<Q> parent, TaskExecutor executor) {
public FlowBuilder<Q> add(Flow... flows) {
Collection<Flow> list = new ArrayList<>(Arrays.asList(flows));
String name = "split" + (parent.splitCounter++);
int counter = 0;
State one = parent.currentState;
Flow flow = null;

if (one instanceof SplitState) {
parent.currentState = parent.createState(list, executor, (SplitState) one);
return parent;
}

if (!(one == null || one instanceof FlowState)) {
FlowBuilder<Flow> stateBuilder = new FlowBuilder<>(name + "_" + (counter++));
FlowBuilder<Flow> stateBuilder = new FlowBuilder<>(name + "_0");
stateBuilder.currentState = one;
flow = stateBuilder.build();
list.add(stateBuilder.build());
}
else if (one instanceof FlowState && parent.states.size() == 1) {
list.add(((FlowState) one).getFlows().iterator().next());
}

if (flow != null) {
list.add(flow);
}
State next = parent.createState(list, executor);
parent.currentState = next;
parent.currentState = parent.createState(list, executor, null);
return parent;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2013 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,9 @@
package org.springframework.batch.core.job.flow.support.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand All @@ -32,6 +33,7 @@
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.lang.Nullable;

/**
* A {@link State} implementation that splits a {@link Flow} into multiple parallel
Expand All @@ -44,17 +46,29 @@ public class SplitState extends AbstractState implements FlowHolder {

private final Collection<Flow> flows;

private final SplitState parentSplit;

private TaskExecutor taskExecutor = new SyncTaskExecutor();

private FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator();
private final FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator();

/**
* @param flows collection of {@link Flow} instances.
* @param name the name of the state.
*/
public SplitState(Collection<Flow> flows, String name) {
this(flows, name, null);
}

/**
* @param flows collection of {@link Flow} instances.
* @param name the name of the state.
* @param parentSplit the parent {@link SplitState}.
*/
public SplitState(Collection<Flow> flows, String name, @Nullable SplitState parentSplit) {
super(name);
this.flows = flows;
this.parentSplit = parentSplit;
}

/**
Expand Down Expand Up @@ -88,12 +102,7 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception

for (final Flow flow : flows) {

final FutureTask<FlowExecution> task = new FutureTask<>(new Callable<FlowExecution>() {
@Override
public FlowExecution call() throws Exception {
return flow.start(executor);
}
});
final FutureTask<FlowExecution> task = new FutureTask<>(() -> flow.start(executor));

tasks.add(task);

Expand All @@ -106,6 +115,8 @@ public FlowExecution call() throws Exception {

}

FlowExecutionStatus parentSplitStatus = parentSplit == null ? null : parentSplit.handle(executor);

Collection<FlowExecution> results = new ArrayList<>();

// Could use a CompletionService here?
Expand All @@ -125,7 +136,11 @@ public FlowExecution call() throws Exception {
}
}

return doAggregation(results, executor);
FlowExecutionStatus flowExecutionStatus = doAggregation(results, executor);
if (parentSplitStatus != null) {
return Collections.max(Arrays.asList(flowExecutionStatus, parentSplitStatus));
}
return flowExecutionStatus;
}

protected FlowExecutionStatus doAggregation(Collection<FlowExecution> results, FlowExecutor executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
Expand Down Expand Up @@ -168,6 +169,22 @@ void testBuildSplit() {
assertEquals(2, execution.getStepExecutions().size());
}

@Test
void testNestedSplitsWithSingleThread() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);

FlowBuilder<SimpleFlow> flowBuilder = new FlowBuilder<>("flow");
FlowBuilder.SplitBuilder<SimpleFlow> splitBuilder = flowBuilder.split(taskExecutor);
splitBuilder.add(new FlowBuilder<Flow>("subflow1").from(step1).end());
splitBuilder.add(new FlowBuilder<Flow>("subflow2").from(step2).end());
Job job = new JobBuilder("job").repository(jobRepository).start(flowBuilder.build()).end().build();
job.execute(execution);

assertEquals(BatchStatus.COMPLETED, execution.getStatus());
assertEquals(2, execution.getStepExecutions().size());
}

@Test
void testBuildSplitUsingStartAndAdd_BATCH_2346() {
Flow subflow1 = new FlowBuilder<Flow>("subflow1").from(step2).end();
Expand Down