1
1
/*
2
- * Copyright 2002-2013 the original author or authors.
2
+ * Copyright 2002-2016 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
24
24
import java .util .concurrent .RejectedExecutionException ;
25
25
26
26
import org .springframework .core .task .AsyncListenableTaskExecutor ;
27
+ import org .springframework .core .task .TaskDecorator ;
27
28
import org .springframework .core .task .TaskRejectedException ;
28
29
import org .springframework .util .Assert ;
29
30
import org .springframework .util .concurrent .ListenableFuture ;
@@ -45,6 +46,8 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
45
46
46
47
private final Executor concurrentExecutor ;
47
48
49
+ private TaskDecorator taskDecorator ;
50
+
48
51
49
52
/**
50
53
* Create a new TaskExecutorAdapter,
@@ -57,14 +60,29 @@ public TaskExecutorAdapter(Executor concurrentExecutor) {
57
60
}
58
61
59
62
63
+ /**
64
+ * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
65
+ * about to be executed.
66
+ * <p>Note that such a decorator is not necessarily being applied to the
67
+ * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
68
+ * execution callback (which may be a wrapper around the user-supplied task).
69
+ * <p>The primary use case is to set some execution context around the task's
70
+ * invocation, or to provide some monitoring/statistics for task execution.
71
+ * @since 4.3
72
+ */
73
+ public final void setTaskDecorator (TaskDecorator taskDecorator ) {
74
+ this .taskDecorator = taskDecorator ;
75
+ }
76
+
77
+
60
78
/**
61
79
* Delegates to the specified JDK concurrent executor.
62
80
* @see java.util.concurrent.Executor#execute(Runnable)
63
81
*/
64
82
@ Override
65
83
public void execute (Runnable task ) {
66
84
try {
67
- this .concurrentExecutor . execute ( task );
85
+ doExecute ( this .concurrentExecutor , this . taskDecorator , task );
68
86
}
69
87
catch (RejectedExecutionException ex ) {
70
88
throw new TaskRejectedException (
@@ -80,12 +98,12 @@ public void execute(Runnable task, long startTimeout) {
80
98
@ Override
81
99
public Future <?> submit (Runnable task ) {
82
100
try {
83
- if (this .concurrentExecutor instanceof ExecutorService ) {
101
+ if (this .taskDecorator == null && this . concurrentExecutor instanceof ExecutorService ) {
84
102
return ((ExecutorService ) this .concurrentExecutor ).submit (task );
85
103
}
86
104
else {
87
105
FutureTask <Object > future = new FutureTask <Object >(task , null );
88
- this .concurrentExecutor . execute ( future );
106
+ doExecute ( this .concurrentExecutor , this . taskDecorator , future );
89
107
return future ;
90
108
}
91
109
}
@@ -98,12 +116,12 @@ public Future<?> submit(Runnable task) {
98
116
@ Override
99
117
public <T > Future <T > submit (Callable <T > task ) {
100
118
try {
101
- if (this .concurrentExecutor instanceof ExecutorService ) {
119
+ if (this .taskDecorator == null && this . concurrentExecutor instanceof ExecutorService ) {
102
120
return ((ExecutorService ) this .concurrentExecutor ).submit (task );
103
121
}
104
122
else {
105
123
FutureTask <T > future = new FutureTask <T >(task );
106
- this .concurrentExecutor . execute ( future );
124
+ doExecute ( this .concurrentExecutor , this . taskDecorator , future );
107
125
return future ;
108
126
}
109
127
}
@@ -117,7 +135,7 @@ public <T> Future<T> submit(Callable<T> task) {
117
135
public ListenableFuture <?> submitListenable (Runnable task ) {
118
136
try {
119
137
ListenableFutureTask <Object > future = new ListenableFutureTask <Object >(task , null );
120
- this .concurrentExecutor . execute ( future );
138
+ doExecute ( this .concurrentExecutor , this . taskDecorator , future );
121
139
return future ;
122
140
}
123
141
catch (RejectedExecutionException ex ) {
@@ -130,7 +148,7 @@ public ListenableFuture<?> submitListenable(Runnable task) {
130
148
public <T > ListenableFuture <T > submitListenable (Callable <T > task ) {
131
149
try {
132
150
ListenableFutureTask <T > future = new ListenableFutureTask <T >(task );
133
- this .concurrentExecutor . execute ( future );
151
+ doExecute ( this .concurrentExecutor , this . taskDecorator , future );
134
152
return future ;
135
153
}
136
154
catch (RejectedExecutionException ex ) {
@@ -139,4 +157,20 @@ public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
139
157
}
140
158
}
141
159
160
+
161
+ /**
162
+ * Actually execute the given {@code Runnable} (which may be a user-supplied task
163
+ * or a wrapper around a user-supplied task) with the given executor.
164
+ * @param concurrentExecutor the underlying JDK concurrent executor to delegate to
165
+ * @param taskDecorator the specified decorator to be applied, if any
166
+ * @param runnable the runnable to execute
167
+ * @throws RejectedExecutionException if the given runnable cannot be accepted
168
+ * @since 4.3
169
+ */
170
+ protected void doExecute (Executor concurrentExecutor , TaskDecorator taskDecorator , Runnable runnable )
171
+ throws RejectedExecutionException {
172
+
173
+ concurrentExecutor .execute (taskDecorator != null ? taskDecorator .decorate (runnable ) : runnable );
174
+ }
175
+
142
176
}
0 commit comments