16
16
17
17
package org .springframework .batch .repeat .support ;
18
18
19
+ import java .util .concurrent .CountDownLatch ;
20
+
19
21
import org .springframework .batch .repeat .RepeatCallback ;
20
22
import org .springframework .batch .repeat .RepeatContext ;
21
23
import org .springframework .batch .repeat .RepeatException ;
@@ -59,6 +61,8 @@ public class TaskExecutorRepeatTemplate extends RepeatTemplate {
59
61
60
62
private TaskExecutor taskExecutor = new SyncTaskExecutor ();
61
63
64
+ private final CountDownLatch latch = new CountDownLatch (1 );
65
+
62
66
/**
63
67
* Public setter for the throttle limit. The throttle limit is the largest number of
64
68
* concurrent tasks that can be executing at one time - if a new task arrives and the
@@ -110,7 +114,7 @@ protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callb
110
114
* Wrap the callback in a runnable that will add its result to the queue when
111
115
* it is ready.
112
116
*/
113
- runnable = new ExecutingRunnable (callback , context , queue );
117
+ runnable = new ExecutingRunnable (callback , context , queue , latch );
114
118
115
119
/*
116
120
* Tell the runnable that it can expect a result. This could have been
@@ -133,6 +137,9 @@ protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callb
133
137
/*
134
138
* Keep going until we get a result that is finished, or early termination...
135
139
*/
140
+ logger .debug ("Waiting for latch : " + latch .hashCode () + " (count=" + latch .getCount () + ")" );
141
+ latch .await ();
142
+ logger .debug ("Latch released : " + latch .hashCode () + " (count=" + latch .getCount () + ")" );
136
143
}
137
144
while (queue .isEmpty () && !isComplete (context ));
138
145
@@ -216,14 +223,15 @@ private class ExecutingRunnable implements Runnable, ResultHolder {
216
223
217
224
private volatile Throwable error ;
218
225
219
- public ExecutingRunnable (RepeatCallback callback , RepeatContext context , ResultQueue <ResultHolder > queue ) {
226
+ private CountDownLatch latch ;
227
+ public ExecutingRunnable (RepeatCallback callback , RepeatContext context , ResultQueue <ResultHolder > queue , CountDownLatch latch ) {
220
228
221
229
super ();
222
230
223
231
this .callback = callback ;
224
232
this .context = context ;
225
233
this .queue = queue ;
226
-
234
+ this . latch = latch ;
227
235
}
228
236
229
237
/**
@@ -272,6 +280,9 @@ public void run() {
272
280
273
281
queue .put (this );
274
282
283
+ logger .debug ("Will count down the latch : " + latch .hashCode () + " (count=" + latch .getCount () + ")" );
284
+ this .latch .countDown ();
285
+ logger .debug ("Latch updated : " + latch .hashCode () + " (count=" + latch .getCount () + ")" );
275
286
}
276
287
}
277
288
0 commit comments