Skip to content

Commit 3281cc7

Browse files
committed
Add TaskExecutorRepeatTemplate.java as original file
1 parent 79a4e97 commit 3281cc7

File tree

1 file changed

+331
-0
lines changed

1 file changed

+331
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
/*
2+
* Copyright 2006-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.repeat.support;
18+
19+
import org.springframework.batch.repeat.RepeatCallback;
20+
import org.springframework.batch.repeat.RepeatContext;
21+
import org.springframework.batch.repeat.RepeatException;
22+
import org.springframework.batch.repeat.RepeatOperations;
23+
import org.springframework.batch.repeat.RepeatStatus;
24+
import org.springframework.core.task.SyncTaskExecutor;
25+
import org.springframework.core.task.TaskExecutor;
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* Provides {@link RepeatOperations} support including interceptors that can be used to
30+
* modify or monitor the behaviour at run time.<br>
31+
*
32+
* This implementation is sufficient to be used to configure transactional behaviour for
33+
* each item by making the {@link RepeatCallback} transactional, or for the whole batch by
34+
* making the execute method transactional (but only then if the task executor is
35+
* synchronous).<br>
36+
*
37+
* This class is thread-safe if its collaborators are thread-safe (interceptors,
38+
* terminationPolicy, callback). Normally this will be the case, but clients need to be
39+
* aware that if the task executor is asynchronous, then the other collaborators should be
40+
* also. In particular the {@link RepeatCallback} that is wrapped in the execute method
41+
* must be thread-safe - often it is based on some form of data source, which itself
42+
* should be both thread-safe and transactional (multiple threads could be accessing it at
43+
* any given time, and each thread would have its own transaction).<br>
44+
*
45+
* @author Dave Syer
46+
* @author Mahmoud Ben Hassine
47+
*
48+
*/
49+
public class TaskExecutorRepeatTemplate extends RepeatTemplate {
50+
51+
/**
52+
* Default limit for maximum number of concurrent unfinished results allowed by the
53+
* template.
54+
* {@link #getNextResult(RepeatContext, RepeatCallback, RepeatInternalState)} .
55+
*/
56+
public static final int DEFAULT_THROTTLE_LIMIT = 4;
57+
58+
private int throttleLimit = DEFAULT_THROTTLE_LIMIT;
59+
60+
private TaskExecutor taskExecutor = new SyncTaskExecutor();
61+
62+
/**
63+
* Public setter for the throttle limit. The throttle limit is the largest number of
64+
* concurrent tasks that can be executing at one time - if a new task arrives and the
65+
* throttle limit is breached we wait for one of the executing tasks to finish before
66+
* submitting the new one to the {@link TaskExecutor}. Default value is
67+
* {@link #DEFAULT_THROTTLE_LIMIT}. N.B. when used with a thread pooled
68+
* {@link TaskExecutor} the thread pool might prevent the throttle limit actually
69+
* being reached (so make the core pool size larger than the throttle limit if
70+
* possible).
71+
* @param throttleLimit the throttleLimit to set.
72+
* @deprecated since 5.0, scheduled for removal in 6.0. Use a pooled
73+
* {@link TaskExecutor} implemenation with a limited capacity of its task queue
74+
* instead.
75+
*/
76+
@Deprecated(since = "5.0", forRemoval = true)
77+
public void setThrottleLimit(int throttleLimit) {
78+
this.throttleLimit = throttleLimit;
79+
}
80+
81+
/**
82+
* Setter for task executor to be used to run the individual item callbacks.
83+
* @param taskExecutor a TaskExecutor
84+
* @throws IllegalArgumentException if the argument is null
85+
*/
86+
public void setTaskExecutor(TaskExecutor taskExecutor) {
87+
Assert.notNull(taskExecutor, "A TaskExecutor is required");
88+
this.taskExecutor = taskExecutor;
89+
}
90+
91+
/**
92+
* Use the {@link #setTaskExecutor(TaskExecutor)} to generate a result. The internal
93+
* state in this case is a queue of unfinished result holders of type
94+
* {@link ResultHolder}. The holder with the return value should not be on the queue
95+
* when this method exits. The queue is scoped in the calling method so there is no
96+
* need to synchronize access.
97+
*
98+
*/
99+
@Override
100+
protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
101+
throws Throwable {
102+
103+
ExecutingRunnable runnable;
104+
105+
ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
106+
107+
do {
108+
109+
/*
110+
* Wrap the callback in a runnable that will add its result to the queue when
111+
* it is ready.
112+
*/
113+
runnable = new ExecutingRunnable(callback, context, queue);
114+
115+
/*
116+
* Tell the runnable that it can expect a result. This could have been
117+
* in-lined with the constructor, but it might block, so it's better to do it
118+
* here, since we have the option (it's a private class).
119+
*/
120+
runnable.expect();
121+
122+
/*
123+
* Start the task possibly concurrently / in the future.
124+
*/
125+
taskExecutor.execute(runnable);
126+
127+
/*
128+
* Allow termination policy to update its state. This must happen immediately
129+
* before or after the call to the task executor.
130+
*/
131+
update(context);
132+
133+
/*
134+
* Keep going until we get a result that is finished, or early termination...
135+
*/
136+
}
137+
while (queue.isEmpty() && !isComplete(context));
138+
139+
/*
140+
* N.B. If the queue is empty then take() blocks until a result appears, and there
141+
* must be at least one because we just submitted one to the task executor.
142+
*/
143+
ResultHolder result = queue.take();
144+
if (result.getError() != null) {
145+
throw result.getError();
146+
}
147+
return result.getResult();
148+
}
149+
150+
/**
151+
* Wait for all the results to appear on the queue and execute the after interceptors
152+
* for each one.
153+
*
154+
* @see org.springframework.batch.repeat.support.RepeatTemplate#waitForResults(org.springframework.batch.repeat.support.RepeatInternalState)
155+
*/
156+
@Override
157+
protected boolean waitForResults(RepeatInternalState state) {
158+
159+
ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
160+
161+
boolean result = true;
162+
163+
while (queue.isExpecting()) {
164+
165+
/*
166+
* Careful that no runnables that are not going to finish ever get onto the
167+
* queue, else this may block forever.
168+
*/
169+
ResultHolder future;
170+
try {
171+
future = queue.take();
172+
}
173+
catch (InterruptedException e) {
174+
Thread.currentThread().interrupt();
175+
throw new RepeatException("InterruptedException while waiting for result.");
176+
}
177+
178+
if (future.getError() != null) {
179+
state.getThrowables().add(future.getError());
180+
result = false;
181+
}
182+
else {
183+
RepeatStatus status = future.getResult();
184+
result = result && canContinue(status);
185+
executeAfterInterceptors(future.getContext(), status);
186+
}
187+
188+
}
189+
190+
Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
191+
192+
return result;
193+
}
194+
195+
@Override
196+
protected RepeatInternalState createInternalState(RepeatContext context) {
197+
// Queue of pending results:
198+
return new ResultQueueInternalState(throttleLimit);
199+
}
200+
201+
/**
202+
* A runnable that puts its result on a queue when it is done.
203+
*
204+
* @author Dave Syer
205+
*
206+
*/
207+
private class ExecutingRunnable implements Runnable, ResultHolder {
208+
209+
private final RepeatCallback callback;
210+
211+
private final RepeatContext context;
212+
213+
private final ResultQueue<ResultHolder> queue;
214+
215+
private volatile RepeatStatus result;
216+
217+
private volatile Throwable error;
218+
219+
public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
220+
221+
super();
222+
223+
this.callback = callback;
224+
this.context = context;
225+
this.queue = queue;
226+
227+
}
228+
229+
/**
230+
* Tell the queue to expect a result.
231+
*/
232+
public void expect() {
233+
try {
234+
queue.expect();
235+
}
236+
catch (InterruptedException e) {
237+
Thread.currentThread().interrupt();
238+
throw new RepeatException("InterruptedException waiting for to acquire lock on input.");
239+
}
240+
}
241+
242+
/**
243+
* Execute the batch callback, and store the result, or any exception that is
244+
* thrown for retrieval later by caller.
245+
*
246+
* @see java.lang.Runnable#run()
247+
*/
248+
@Override
249+
public void run() {
250+
boolean clearContext = false;
251+
try {
252+
if (RepeatSynchronizationManager.getContext() == null) {
253+
clearContext = true;
254+
RepeatSynchronizationManager.register(context);
255+
}
256+
257+
if (logger.isDebugEnabled()) {
258+
logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
259+
}
260+
261+
result = callback.doInIteration(context);
262+
263+
}
264+
catch (Throwable e) {
265+
error = e;
266+
}
267+
finally {
268+
269+
if (clearContext) {
270+
RepeatSynchronizationManager.clear();
271+
}
272+
273+
queue.put(this);
274+
275+
}
276+
}
277+
278+
/**
279+
* Get the result - never blocks because the queue manages waiting for the task to
280+
* finish.
281+
*/
282+
@Override
283+
public RepeatStatus getResult() {
284+
return result;
285+
}
286+
287+
/**
288+
* Get the error - never blocks because the queue manages waiting for the task to
289+
* finish.
290+
*/
291+
@Override
292+
public Throwable getError() {
293+
return error;
294+
}
295+
296+
/**
297+
* Getter for the context.
298+
*/
299+
@Override
300+
public RepeatContext getContext() {
301+
return this.context;
302+
}
303+
304+
}
305+
306+
/**
307+
* @author Dave Syer
308+
*
309+
*/
310+
private static class ResultQueueInternalState extends RepeatInternalStateSupport {
311+
312+
private final ResultQueue<ResultHolder> results;
313+
314+
/**
315+
* @param throttleLimit the throttle limit for the result queue
316+
*/
317+
public ResultQueueInternalState(int throttleLimit) {
318+
super();
319+
this.results = new ResultHolderResultQueue(throttleLimit);
320+
}
321+
322+
/**
323+
* @return the result queue
324+
*/
325+
public ResultQueue<ResultHolder> getResultQueue() {
326+
return results;
327+
}
328+
329+
}
330+
331+
}

0 commit comments

Comments
 (0)